helix

package module
v0.5.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Dec 14, 2025 License: Apache-2.0 Imports: 11 Imported by: 0

README

Helix

Go Reference Go Report Card

Helix is a high-availability dual-database client library for Go, designed to support "Shared Nothing" architecture with active-active dual writes, sticky reads, and asynchronous reconciliation.

Why "Helix"?

Biomimetic Fault Tolerance.

Helix is named after the DNA double helix: two independent strands carrying the same genetic code.

In this architecture, your database clusters are the strands. They share nothing—no state, no gossip, no master-slave tether. They exist in parallel universes.

  • Dual Writes replicate the code to both strands simultaneously.
  • Sticky Reads latch onto a single strand for maximum locality.
  • Replay acts as the repair enzyme, asynchronously healing "mutations" (inconsistencies) when a strand temporarily fails.

If one strand snaps, the other keeps the organism alive. It's 4 billion years of evolution applied to high-availability engineering. 🧬

Features

  • Dual Active-Active Writes - Concurrent writes to two independent clusters for maximum availability
  • Sticky Read Routing - Per-client sticky reads to maximize cache hits across clusters
  • Active Failover - Immediate failover to secondary cluster on read failures
  • Replay System - Asynchronous reconciliation via in-memory queue or NATS JetStream
  • Drop-in Replacement - Interface-based design mirrors gocql API for minimal migration effort
  • SQL Support - Simple wrapper for database/sql with dual-write semantics

Installation

go get github.com/arloliu/helix

Quick Start

CQL (Cassandra/ScyllaDB)
package main

import (
    "log"
    "time"

    "github.com/arloliu/helix"
    v1 "github.com/arloliu/helix/adapter/cql/v1"
    "github.com/arloliu/helix/policy"
    "github.com/arloliu/helix/replay"
    "github.com/gocql/gocql"
)

func main() {
    // Create gocql sessions for both clusters
    clusterA := gocql.NewCluster("cluster-a.example.com")
    clusterA.Keyspace = "myapp"
    sessionA, _ := clusterA.CreateSession()
    defer sessionA.Close()

    clusterB := gocql.NewCluster("cluster-b.example.com")
    clusterB.Keyspace = "myapp"
    sessionB, _ := clusterB.CreateSession()
    defer sessionB.Close()

    // Create Helix client
    client, err := helix.NewCQLClient(
        v1.NewSession(sessionA),
        v1.NewSession(sessionB),
        helix.WithReplayer(replay.NewMemoryReplayer()),
        helix.WithReadStrategy(policy.NewStickyRead()),
        helix.WithWriteStrategy(policy.NewConcurrentDualWrite()),
        helix.WithFailoverPolicy(policy.NewActiveFailover()),
    )
    if err != nil {
        log.Fatal(err)
    }
    defer client.Close()

    // Dual-write to both clusters
    err = client.Query(
        "INSERT INTO users (id, name, email) VALUES (?, ?, ?)",
        gocql.TimeUUID(), "Alice", "alice@example.com",
    ).Exec()
    if err != nil {
        log.Printf("Both clusters failed: %v", err)
    }
    // If only one cluster failed, it's automatically queued for replay

    // Read with sticky routing and failover
    var name, email string
    err = client.Query(
        "SELECT name, email FROM users WHERE id = ?",
        userID,
    ).Scan(&name, &email)
}
SQL (PostgreSQL, MySQL, etc.)
package main

import (
    "database/sql"
    "log"

    "github.com/arloliu/helix"
    "github.com/arloliu/helix/replay"
    _ "github.com/lib/pq"
)

func main() {
    // Connect to both databases
    primary, _ := sql.Open("postgres", "host=primary.example.com ...")
    secondary, _ := sql.Open("postgres", "host=secondary.example.com ...")

    // Create Helix SQL client
    client, err := helix.NewSQLClientFromDB(primary, secondary,
        helix.WithReplayer(replay.NewMemoryReplayer()),
    )
    if err != nil {
        log.Fatal(err)
    }
    defer client.Close()

    // Dual-write
    _, err = client.Exec(
        "INSERT INTO users (id, name) VALUES ($1, $2)",
        "user-1", "Alice",
    )
}

Architecture

%%{init:{'theme':'neutral'}}%%
flowchart TD
    Client[Dual-Session Client]

    subgraph Clusters [Cassandra Clusters]
        CA[(Cassandra Cluster A)]
        CB[(Cassandra Cluster B)]
    end

    subgraph ReplaySys [Replay System]
        NATS["NATS JetStream<br/>(DLQ / Replay Log)"]
        Worker[Background Replay Worker]
    end

    %% Dual Write Path
    Client -- "1. Dual Write (Concurrent)" --> CA
    Client -- "1. Dual Write (Concurrent)" --> CB

    %% Failure Path
    Client -- "2. On Failure (e.g., B fails)" --> NATS

    %% Replay Path
    NATS -- "3. Consume Failed Write" --> Worker
    Worker -- "4. Replay Write (Idempotent)" --> CB

    classDef db fill:#e1f5fe,stroke:#01579b,stroke-width:2px;
    classDef component fill:#fff9c4,stroke:#fbc02d,stroke-width:2px;
    class CA,CB db;
    class NATS,Worker component;
    %% --- Stylesheet ---
    classDef app fill:#e3f2fd,stroke:#1565c0,stroke-width:2px,color:#000;
    classDef db fill:#f3e5f5,stroke:#7b1fa2,stroke-width:2px,color:#000;
    classDef infra fill:#fff3e0,stroke:#e65100,stroke-width:2px,color:#000;

    class Client app;
    class CA,CB db;
    class NATS,Worker infra;

Strategies & Policies

Write Strategies
Strategy Description
ConcurrentDualWrite Writes to both clusters concurrently (default)
SyncDualWrite Writes sequentially (A then B, or B then A)
AdaptiveDualWrite Latency-aware: healthy clusters wait, degraded clusters fire-and-forget
Read Strategies
Strategy Description
StickyRead Sticks to one cluster per client instance (default)
PrimaryOnlyRead Always reads from Cluster A
RoundRobinRead Alternates between clusters
Failover Policies
Policy Description
ActiveFailover Immediately tries secondary on failure (default)
CircuitBreaker Switches after N consecutive failures
LatencyCircuitBreaker CircuitBreaker + treats slow responses as soft failures

See Strategy & Policy Documentation for detailed configuration and interaction patterns.

Replay System

Helix provides two replay implementations for handling partial write failures:

Implementation Durability Use Case
MemoryReplayer Volatile Development, testing
NATSReplayer Durable Production (requires NATS JetStream)

See Replay System Documentation for detailed usage patterns.

Configuration Options

Production Recommendations

For production dual-cluster deployments, always configure:

Component Why It Matters
Replayer Critical: Without a replayer, partial write failures are lost permanently. Use NATSReplayer for durability.
ReadStrategy Improves read performance. StickyRead maximizes cache hits by routing reads to the same cluster.
WriteStrategy Controls write behavior. AdaptiveDualWrite handles degraded clusters gracefully.
FailoverPolicy Enables automatic read failover. ActiveFailover immediately retries on the secondary cluster.

Warning: A warning is logged if you create a dual-cluster client without a Replayer configured.

Minimal Production Example
client, err := helix.NewCQLClient(
    v1.NewSession(sessionA),
    v1.NewSession(sessionB),
    // REQUIRED for production: enables failure recovery
    helix.WithReplayer(replay.NewNATSReplayer(nc, nats.JetStreamContext(nc))),
    helix.WithReplayWorker(replay.NewWorker(replayer)),

    // RECOMMENDED: optimizes read/write behavior
    helix.WithReadStrategy(policy.NewStickyRead()),
    helix.WithWriteStrategy(policy.NewAdaptiveDualWrite()),
    helix.WithFailoverPolicy(policy.NewActiveFailover()),
)
All Configuration Options
helix.NewCQLClient(sessionA, sessionB,
    // Strategies
    helix.WithReadStrategy(policy.NewStickyRead(
        policy.WithStickyReadCooldown(5*time.Minute), // Prevent rapid cluster switching
    )),
    helix.WithWriteStrategy(policy.NewConcurrentDualWrite()),
    helix.WithFailoverPolicy(policy.NewActiveFailover()),

    // Replay
    helix.WithReplayer(replayer),
    helix.WithReplayWorker(worker),  // Optional: auto-start worker

    // Timestamps (critical for idempotency)
    helix.WithTimestampProvider(func() int64 {
        return time.Now().UnixMicro()
    }),
)

Examples

See the examples directory:

Documentation

Requirements

  • Go 1.25+
  • For CQL: v1: github.com/gocql/gocql or v2: github.com/apache/cassandra-gocql-driver
  • For NATS Replay: github.com/nats-io/nats.go

License

MIT License - see LICENSE for details.

Documentation

Overview

Package helix provides a high-availability dual-database client library designed to support "Shared Nothing" architecture.

Helix provides robustness through active-active dual writes, sticky reads, and asynchronous reconciliation for independent Cassandra clusters.

Key Features

  • Dual Active-Active Writes: Concurrent writes to two independent clusters
  • Sticky Read Routing: Per-client sticky reads to maximize cache hits
  • Active Failover: Immediate failover to secondary cluster on read failures
  • Replay System: Asynchronous reconciliation via in-memory queue or NATS JetStream
  • Drop-in Replacement: Interface-based design mirrors gocql API

Basic Usage

// Create sessions for both clusters
sessionA := v1.NewSession(clusterA.CreateSession())
sessionB := v1.NewSession(clusterB.CreateSession())

// Create Helix client
client, err := helix.NewCQLClient(sessionA, sessionB,
    helix.WithReadStrategy(policy.NewStickyRead()),
    helix.WithWriteStrategy(policy.NewConcurrentDualWrite()),
)
if err != nil {
    log.Fatal(err)
}
defer client.Close()

// Use like a normal gocql session
err = client.Query("INSERT INTO users (id, name) VALUES (?, ?)", id, name).Exec()

Error Handling

Helix uses standard Go errors with clear semantics for dual-cluster operations.

Write Operation Errors

Write operations (Exec, ExecContext, batch.Exec) follow this error model:

  • nil: At least one cluster succeeded (partial writes are queued for replay)
  • error: Both clusters failed (operation completely failed)

When both clusters fail, a types.DualClusterError is returned:

err := client.Query("INSERT INTO ...").Exec()
if err != nil {
    var dualErr *types.DualClusterError
    if errors.As(err, &dualErr) {
        // Both clusters failed, inspect individual errors
        log.Printf("Cluster A: %v", dualErr.ErrorA)
        log.Printf("Cluster B: %v", dualErr.ErrorB)
    }
}

Sentinel Errors

Helix defines several sentinel errors for specific scenarios:

  • types.ErrSessionClosed: Operation attempted on closed client
  • types.ErrReplayQueueFull: Replay queue at capacity, cannot enqueue failed write
  • types.ErrNoAvailableCluster: No cluster available for reads (both down/draining)
  • types.ErrBothClustersDraining: Both clusters in drain mode, writes rejected
  • types.ErrWriteAsync: Write sent async to degraded cluster (AdaptiveDualWrite only)
  • types.ErrWriteDropped: Write dropped due to concurrency limit (AdaptiveDualWrite only)

Check for sentinel errors using errors.Is:

if errors.Is(err, types.ErrSessionClosed) {
    // Handle closed session
}

Wrapped Errors

Cluster-specific errors are wrapped in types.ClusterError:

var clusterErr *types.ClusterError
if errors.As(err, &clusterErr) {
    log.Printf("Cluster %s failed during %s: %v",
        clusterErr.Cluster, clusterErr.Operation, clusterErr.Cause)
}

Context and Timeouts

Context usage follows Go idioms. Two equivalent patterns are supported:

// Pattern 1: Direct context method (preferred for simple cases)
err := client.Query("SELECT ...").ExecContext(ctx)

// Pattern 2: Method chaining (useful with multiple options)
err := client.Query("SELECT ...").
    Consistency(helix.Quorum).
    WithContext(ctx).
    Exec()

Both patterns respect context cancellation and deadlines. For dual-writes, each cluster operation gets an independent timeout context to prevent one slow cluster from blocking the other.

Idempotency and Timestamps

Helix uses client-generated timestamps to ensure replay operations are idempotent. Without proper timestamps:

  • Replayed writes may overwrite newer data
  • Last-write-wins semantics may be violated

Always configure a timestamp provider or use WithTimestamp():

helix.WithTimestampProvider(func() int64 {
    return time.Now().UnixMicro()
})

CAS/LWT Warning

Lightweight Transactions (INSERT ... IF NOT EXISTS, ScanCAS, etc.) are NOT safe in a shared-nothing dual-cluster architecture. Helix does not guarantee correctness for CAS/LWT operations.

Example (ContextUsage)

Example_contextUsage demonstrates two patterns for context handling in queries.

var client *CQLClient
ctx := context.Background()

// Pattern 1: Direct context method (simple cases)
err := client.Query("INSERT INTO users (id, name) VALUES (?, ?)", "user-1", "Alice").
	ExecContext(ctx)
_ = err

// Pattern 2: Method chaining (multiple options)
err = client.Query("SELECT name FROM users WHERE id = ?", "user-1").
	Consistency(Quorum).
	WithContext(ctx).
	Exec()
_ = err
Example (ErrorHandling)

Example_errorHandling demonstrates inspecting DualClusterError when both clusters fail.

var client *CQLClient

// Perform a write operation
err := client.Query("INSERT INTO users (id, name) VALUES (?, ?)", "user-1", "Alice").Exec()
// Check if both clusters failed
if err != nil {
	var dualErr *types.DualClusterError
	if errors.As(err, &dualErr) {
		// Inspect individual cluster errors
		_ = dualErr.ErrorA // Error from cluster A
		_ = dualErr.ErrorB // Error from cluster B
	}

	// Check for specific sentinel errors
	if errors.Is(err, types.ErrSessionClosed) {
		// Session was closed, recreate client or handle gracefully
		return
	}
}

Index

Examples

Constants

View Source
const (
	ClusterA = types.ClusterA
	ClusterB = types.ClusterB
)

Re-export cluster ID constants for convenience.

View Source
const (
	Any         = types.Any
	One         = types.One
	Two         = types.Two
	Three       = types.Three
	Quorum      = types.Quorum
	All         = types.All
	LocalQuorum = types.LocalQuorum
	EachQuorum  = types.EachQuorum
	Serial      = types.Serial
	LocalSerial = types.LocalSerial
	LocalOne    = types.LocalOne
)

Re-export consistency level constants for convenience.

View Source
const (
	LoggedBatch   = types.LoggedBatch
	UnloggedBatch = types.UnloggedBatch
	CounterBatch  = types.CounterBatch
)

Re-export batch type constants for convenience.

View Source
const (
	PriorityHigh = types.PriorityHigh
	PriorityLow  = types.PriorityLow
)

Re-export priority level constants for convenience.

Variables

This section is empty.

Functions

func DefaultTimestampProvider

func DefaultTimestampProvider() int64

DefaultTimestampProvider returns the current time in microseconds.

Types

type Batch

type Batch interface {
	// Query adds a statement to the batch.
	//
	// Parameters:
	//   - stmt: CQL statement with ? placeholders
	//   - args: Values to bind to placeholders
	//
	// Returns:
	//   - Batch: The same batch for chaining
	Query(stmt string, args ...any) Batch

	// Consistency sets the consistency level for the batch.
	//
	// Parameters:
	//   - c: Consistency level
	//
	// Returns:
	//   - Batch: The same batch for chaining
	Consistency(c Consistency) Batch

	// SetConsistency sets the consistency level for the batch.
	//
	// Deprecated: Use Consistency() instead. This method exists for gocql v1 compatibility.
	//
	// Parameters:
	//   - c: Consistency level
	SetConsistency(c Consistency)

	// WithContext associates a context with the batch.
	//
	// Parameters:
	//   - ctx: Context for cancellation and timeout
	//
	// Returns:
	//   - Batch: The same batch for chaining
	WithContext(ctx context.Context) Batch

	// WithTimestamp sets a specific timestamp for all statements in the batch.
	//
	// Parameters:
	//   - ts: Timestamp in microseconds since Unix epoch
	//
	// Returns:
	//   - Batch: The same batch for chaining
	WithTimestamp(ts int64) Batch

	// WithPriority sets the replay priority level for this batch operation.
	//
	// Priority affects replay processing order when a write fails on one cluster.
	// High priority writes are processed before low priority writes.
	// If not set, defaults to PriorityHigh.
	//
	// Parameters:
	//   - p: Priority level (PriorityHigh or PriorityLow)
	//
	// Returns:
	//   - Batch: The same batch for chaining
	WithPriority(p PriorityLevel) Batch

	// SerialConsistency sets the consistency level for the serial phase of CAS operations.
	//
	// This only applies to batch CAS operations.
	// Valid values are Serial or LocalSerial.
	//
	// Parameters:
	//   - c: Serial consistency level (Serial or LocalSerial)
	//
	// Returns:
	//   - Batch: The same batch for chaining
	SerialConsistency(c Consistency) Batch

	// Size returns the number of statements in the batch.
	//
	// Returns:
	//   - int: Number of statements added to the batch
	Size() int

	// Exec executes the batch using the Write Strategy.
	//
	// This triggers concurrent dual-write to both clusters.
	//
	// Returns:
	//   - error: nil on success, error if both clusters fail
	Exec() error

	// ExecContext executes the batch with the given context.
	//
	// Parameters:
	//   - ctx: Context for cancellation and timeout
	//
	// Returns:
	//   - error: nil on success, error if both clusters fail
	ExecContext(ctx context.Context) error

	// IterContext executes the batch with context and returns an iterator.
	//
	// This is useful for accessing execution metadata like latency and attempts.
	// The iterator reads from the selected cluster based on sticky routing.
	//
	// Parameters:
	//   - ctx: Context for cancellation and timeout
	//
	// Returns:
	//   - Iter: Iterator for accessing batch execution results
	IterContext(ctx context.Context) Iter

	// ExecCAS executes a batch with lightweight transactions.
	//
	// NOTE: CAS operations are executed on a single cluster and are NOT replicated.
	//
	// Parameters:
	//   - dest: Pointers to variables to receive previous values if not applied
	//
	// Returns:
	//   - applied: true if the CAS operation was applied
	//   - iter: Iterator for scanning additional CAS result rows
	//   - err: error if the operation failed
	ExecCAS(dest ...any) (applied bool, iter Iter, err error)

	// ExecCASContext executes a batch with lightweight transactions and context.
	//
	// NOTE: CAS operations are executed on a single cluster and are NOT replicated.
	//
	// Parameters:
	//   - ctx: Context for cancellation and timeout
	//   - dest: Pointers to variables to receive previous values if not applied
	//
	// Returns:
	//   - applied: true if the CAS operation was applied
	//   - iter: Iterator for scanning additional CAS result rows
	//   - err: error if the operation failed
	ExecCASContext(ctx context.Context, dest ...any) (applied bool, iter Iter, err error)

	// MapExecCAS executes a batch with lightweight transactions and returns previous values as a map.
	//
	// NOTE: CAS operations are executed on a single cluster and are NOT replicated.
	//
	// Parameters:
	//   - dest: Map to receive previous values if not applied
	//
	// Returns:
	//   - applied: true if the CAS operation was applied
	//   - iter: Iterator for scanning additional CAS result rows
	//   - err: error if the operation failed
	MapExecCAS(dest map[string]any) (applied bool, iter Iter, err error)

	// MapExecCASContext executes a batch with lightweight transactions, context, and map result.
	//
	// NOTE: CAS operations are executed on a single cluster and are NOT replicated.
	//
	// Parameters:
	//   - ctx: Context for cancellation and timeout
	//   - dest: Map to receive previous values if not applied
	//
	// Returns:
	//   - applied: true if the CAS operation was applied
	//   - iter: Iterator for scanning additional CAS result rows
	//   - err: error if the operation failed
	MapExecCASContext(ctx context.Context, dest map[string]any) (applied bool, iter Iter, err error)
}

Batch mirrors gocql.Batch for grouping multiple mutations.

All statements in a batch are executed atomically and use the Write Strategy.

type BatchStatement

type BatchStatement = types.BatchStatement

Type aliases for convenience - re-export from types package.

type BatchType

type BatchType = types.BatchType

Type aliases for convenience - re-export from types package.

type CQLClient

type CQLClient struct {
	// contains filtered or unexported fields
}

CQLClient is the main Helix CQL client for single or dual-cluster operations.

It wraps one or two CQL sessions and orchestrates reads and writes according to configured strategies. When only one session is provided (sessionB is nil), the client operates in single-cluster mode with pass-through behavior.

Single-cluster mode is ideal for:

  • Migrating existing applications to Helix incrementally
  • Development and testing environments
  • Applications that don't need dual-cluster redundancy yet

Thread Safety

CQLClient is safe for concurrent use from multiple goroutines. A single client instance can be shared across your application:

// Create once, share everywhere
client, err := helix.NewCQLClient(sessionA, sessionB, ...)
defer client.Close()

// Use from multiple goroutines safely
go func() { client.Query("INSERT ...").Exec() }()
go func() { client.Query("SELECT ...").Scan(&result) }()

All internal state is protected by atomic operations or appropriate locking.

Lifecycle

Create a client with NewCQLClient() and clean up resources with Close():

client, err := helix.NewCQLClient(sessionA, sessionB, opts...)
if err != nil {
    log.Fatal(err)
}
defer client.Close()  // Always close to release resources

After Close() is called:

  • All ongoing operations complete or are cancelled
  • Replay worker is stopped (enqueued replays are lost if using MemoryReplayer)
  • Topology watcher is stopped
  • Underlying sessions are closed
  • The client cannot be reused (operations return ErrSessionClosed)

func NewCQLClient

func NewCQLClient(sessionA, sessionB cql.Session, opts ...Option) (*CQLClient, error)

NewCQLClient creates a new Helix CQL client.

The client supports two modes:

  • Single-cluster mode: Pass sessionB as nil. Operations are executed directly on sessionA without dual-write or failover logic. This provides a drop-in replacement for existing single-cluster applications.
  • Dual-cluster mode: Pass both sessions. Operations use configured strategies for dual writes, sticky reads, and failover.

If a ReplayWorker is configured, it will be started automatically. The worker will be stopped when Close() is called.

If a TopologyWatcher is configured, it will be started automatically to monitor drain mode signals. Writes to draining clusters are skipped and enqueued for replay. Reads are failed over away from draining clusters.

Parameters:

  • sessionA: CQL session for cluster A (required)
  • sessionB: CQL session for cluster B (optional, nil for single-cluster mode)
  • opts: Optional configuration options

Returns:

  • *CQLClient: A new CQL client
  • error: ErrNilSession if sessionA is nil, or error from worker start
Example

ExampleNewCQLClient demonstrates creating a Helix CQL client with custom strategies.

// Note: This example uses nil sessions for illustration purposes.
// In real code, create proper gocql sessions wrapped with v1.NewSession() or v2.NewSession().

// Simulating session creation (replace with actual gocql sessions in real code)
var sessionA, sessionB cql.Session // would be v1.NewSession(gocqlSession)

// Create Helix client with custom strategies
client, err := NewCQLClient(sessionA, sessionB,
	WithReadStrategy(nil),   // e.g., policy.NewStickyRead()
	WithWriteStrategy(nil),  // e.g., policy.NewConcurrentDualWrite()
	WithFailoverPolicy(nil), // e.g., policy.NewActiveFailover()
)
if err != nil {
	// Handle error
	return
}
defer client.Close()

// Client is now ready for dual-cluster operations
_ = client

func (*CQLClient) Batch

func (c *CQLClient) Batch(kind BatchType) Batch

Batch creates a new Batch for grouping multiple mutations.

WARNING: CounterBatch operations are NOT idempotent. If a counter update partially fails (succeeds on one cluster, fails on another), the Replay System will re-apply the operation, causing double-counting. Avoid using CounterBatch with dual-cluster mode if you require exactly-once semantics.

Parameters:

  • kind: Type of batch (Logged, Unlogged, or Counter)

Returns:

  • Batch: A batch builder for adding statements

func (*CQLClient) Close

func (c *CQLClient) Close()

Close terminates connections to cluster(s) and stops the replay worker.

The topology watcher and replay worker are stopped first. After Close is called, the client cannot be reused.

func (*CQLClient) Config

func (c *CQLClient) Config() *ClientConfig

Config returns the current client configuration.

Returns:

  • *ClientConfig: The client's configuration

func (*CQLClient) DefaultExecuteFunc added in v0.5.0

func (c *CQLClient) DefaultExecuteFunc() replay.ExecuteFunc

DefaultExecuteFunc returns an ExecuteFunc for use with replay workers.

This is a convenience method that creates an executor which routes replay payloads to the appropriate cluster session. It handles both single queries and batch operations, preserving the original timestamp for idempotency.

The returned function:

  • Routes to sessionA or sessionB based on payload.TargetCluster
  • Handles batch operations (IsBatch=true) with proper BatchType
  • Preserves the original write timestamp for idempotent replays
  • Respects context cancellation and timeouts via ExecContext

Example:

client, _ := helix.NewCQLClient(sessionA, sessionB, helix.WithReplayer(replayer))

// Create worker with the default executor
worker := replay.NewMemoryWorker(replayer, client.DefaultExecuteFunc(),
    replay.WithOnSuccess(func(p types.ReplayPayload) {
        log.Printf("Replay succeeded for cluster %s", p.TargetCluster)
    }),
)

Returns:

  • replay.ExecuteFunc: A function that executes replay payloads

func (*CQLClient) ExecuteBatch deprecated

func (c *CQLClient) ExecuteBatch(batch Batch) error

ExecuteBatch executes a batch operation.

Deprecated: Use batch.Exec() instead for the modern fluent API. This method is provided for compatibility with gocql v1 users.

Parameters:

  • batch: The batch to execute

Returns:

  • error: Any execution error

func (*CQLClient) ExecuteBatchCAS deprecated

func (c *CQLClient) ExecuteBatchCAS(batch Batch, dest ...any) (applied bool, iter Iter, err error)

ExecuteBatchCAS executes a batch with lightweight transaction semantics.

Deprecated: Use batch.ExecCAS() instead for the modern fluent API. This method is provided for compatibility with gocql v1 users.

Parameters:

  • batch: The batch to execute
  • dest: Optional destination for result columns

Returns:

  • applied: true if the transaction was applied
  • iter: Iterator for result rows if not applied
  • err: Any execution error

func (*CQLClient) IsDraining

func (c *CQLClient) IsDraining(cluster ClusterID) bool

IsDraining returns whether the specified cluster is currently in drain mode.

When a cluster is draining:

  • Writes to the cluster are skipped and enqueued for replay
  • Reads are failed over to the non-draining cluster

Parameters:

  • cluster: The cluster to check

Returns:

  • bool: true if the cluster is being drained

func (*CQLClient) IsSingleCluster

func (c *CQLClient) IsSingleCluster() bool

IsSingleCluster returns true if the client is operating in single-cluster mode.

In single-cluster mode, all operations are executed directly on the primary session without dual-write or failover logic.

func (*CQLClient) MapExecuteBatchCAS deprecated

func (c *CQLClient) MapExecuteBatchCAS(batch Batch, dest map[string]any) (applied bool, iter Iter, err error)

MapExecuteBatchCAS executes a batch CAS operation and maps results to a map.

Deprecated: Use batch.MapExecCAS() instead for the modern fluent API. This method is provided for compatibility with gocql v1 users.

Parameters:

  • batch: The batch to execute
  • dest: Map to receive result columns

Returns:

  • applied: true if the transaction was applied
  • iter: Iterator for result rows if not applied
  • err: Any execution error

func (*CQLClient) NewBatch deprecated

func (c *CQLClient) NewBatch(kind BatchType) Batch

NewBatch creates a new Batch for grouping multiple mutations.

Deprecated: Use Batch() instead for the modern fluent API. This method is provided for compatibility with gocql v1 users.

Parameters:

  • kind: Type of batch (Logged, Unlogged, or Counter)

Returns:

  • Batch: A batch builder for adding statements

func (*CQLClient) Query

func (c *CQLClient) Query(stmt string, values ...any) Query

Query creates a new Query for the given statement.

The method called on the returned Query determines the strategy:

  • Exec/ExecContext → Write Strategy (Dual Write)
  • Scan/Iter/MapScan → Read Strategy (Sticky Read)

Parameters:

  • stmt: CQL statement with ? placeholders
  • values: Values to bind to placeholders

Returns:

  • Query: A query builder for further configuration
Example

ExampleCQLClient_Query demonstrates basic query operations with dual-cluster client.

// Note: Assumes client is already created (see ExampleNewCQLClient)
var client *CQLClient

// Write operation - triggers dual-write to both clusters
err := client.Query("INSERT INTO users (id, name) VALUES (?, ?)", "user-1", "Alice").Exec()
if err != nil {
	// Both clusters failed
	return
}

// Read operation - uses sticky read strategy
var name string
err = client.Query("SELECT name FROM users WHERE id = ?", "user-1").Scan(&name)
if err != nil {
	// Read failed (with failover attempted)
	return
}

_ = name

func (*CQLClient) Session added in v0.3.0

func (c *CQLClient) Session() CQLSession

Session returns the CQLClient as a CQLSession interface.

This allows the CQLClient to be used as a drop-in replacement for gocql.Session in code that expects a session-like interface.

Example:

client, _ := helix.NewCQLClient(sessionA, sessionB)
session := client.Session()

// Use session like a regular gocql.Session
err := session.Query("INSERT INTO ...").Exec()

Returns:

  • CQLSession: The client as a CQLSession interface

func (*CQLClient) SessionA

func (c *CQLClient) SessionA() cql.Session

SessionA returns the underlying session for cluster A.

Use with caution - direct access bypasses Helix's dual-cluster logic.

Returns:

  • cql.Session: The raw session for cluster A

func (*CQLClient) SessionB

func (c *CQLClient) SessionB() cql.Session

SessionB returns the underlying session for cluster B.

Use with caution - direct access bypasses Helix's dual-cluster logic.

Returns:

  • cql.Session: The raw session for cluster B

type CQLSession

type CQLSession interface {
	// Query creates a new Query instance for the given statement.
	//
	// The method called on the returned Query determines the strategy:
	//   - Exec/ExecContext: Triggers Write Strategy (Dual Write)
	//   - Scan/Iter/MapScan: Triggers Read Strategy (Sticky Read)
	//
	// Parameters:
	//   - stmt: CQL statement with ? placeholders
	//   - values: Values to bind to placeholders
	//
	// Returns:
	//   - Query: A query builder for further configuration
	Query(stmt string, values ...any) Query

	// Batch creates a new Batch instance for grouping multiple mutations.
	//
	// All statements in a batch are executed atomically against both clusters
	// using the Write Strategy.
	//
	// Parameters:
	//   - kind: Type of batch (Logged, Unlogged, or Counter)
	//
	// Returns:
	//   - Batch: A batch builder for adding statements
	Batch(kind BatchType) Batch

	// NewBatch creates a new Batch instance for grouping multiple mutations.
	//
	// Deprecated: Use Batch() instead. This method exists for gocql v1 compatibility.
	//
	// Parameters:
	//   - kind: Type of batch (Logged, Unlogged, or Counter)
	//
	// Returns:
	//   - Batch: A batch builder for adding statements
	NewBatch(kind BatchType) Batch

	// ExecuteBatch executes a batch of statements.
	//
	// Deprecated: Use Batch().Exec() instead. This method exists for gocql v1 compatibility.
	//
	// Parameters:
	//   - batch: The batch to execute
	//
	// Returns:
	//   - error: nil on success, error if execution fails
	ExecuteBatch(batch Batch) error

	// ExecuteBatchCAS executes a batch lightweight transaction.
	//
	// Deprecated: Use Batch().ExecCAS() instead. This method exists for gocql v1 compatibility.
	//
	// Parameters:
	//   - batch: The batch to execute
	//   - dest: Pointers to variables to receive previous values if not applied
	//
	// Returns:
	//   - applied: true if the CAS operation was applied
	//   - iter: Iterator for scanning additional CAS result rows
	//   - err: error if the operation failed
	ExecuteBatchCAS(batch Batch, dest ...any) (applied bool, iter Iter, err error)

	// MapExecuteBatchCAS executes a batch lightweight transaction and scans into a map.
	//
	// Deprecated: Use Batch().MapExecCAS() instead. This method exists for gocql v1 compatibility.
	//
	// Parameters:
	//   - batch: The batch to execute
	//   - dest: Map to receive previous values if not applied
	//
	// Returns:
	//   - applied: true if the CAS operation was applied
	//   - iter: Iterator for scanning additional CAS result rows
	//   - err: error if the operation failed
	MapExecuteBatchCAS(batch Batch, dest map[string]any) (applied bool, iter Iter, err error)

	// Close terminates connections to both clusters.
	//
	// After Close is called, the session cannot be reused.
	Close()
}

CQLSession mirrors gocql.Session and provides a drop-in replacement interface.

This interface abstracts over different gocql versions (v1 and v2) and allows Helix to perform dual-cluster operations transparently.

type ClientConfig

type ClientConfig struct {
	ReadStrategy      ReadStrategy
	WriteStrategy     WriteStrategy
	FailoverPolicy    FailoverPolicy
	Replayer          Replayer
	ReplayWorker      ReplayWorker
	TimestampProvider TimestampProvider
	TopologyWatcher   TopologyWatcher
	Metrics           MetricsCollector
	Logger            types.Logger
	ClusterNames      types.ClusterNames
	OnReplayDropped   ReplayDroppedHandler

	// AutoMemoryWorker enables automatic in-process replay with MemoryReplayer.
	// When true, a MemoryReplayer and Worker are created automatically.
	AutoMemoryWorker bool

	// AutoMemoryCapacity is the queue capacity for auto-created MemoryReplayer.
	// Default: 10000
	AutoMemoryCapacity int

	// AutoMemoryWorkerOpts are options passed to the auto-created Worker.
	AutoMemoryWorkerOpts []replay.WorkerOption
}

ClientConfig holds configuration for Helix clients.

func DefaultConfig

func DefaultConfig() *ClientConfig

DefaultConfig returns a ClientConfig with sensible defaults.

The default configuration provides minimal, non-nil infrastructure:

  • TimestampProvider: Uses time.Now().UnixMicro() for idempotent writes
  • Metrics: No-op collector (silent, no overhead)
  • Logger: No-op logger (silent, no overhead)
  • ClusterNames: "ClusterA" and "ClusterB"

Strategy and policy defaults (all nil):

  • ReadStrategy: nil - Falls back to ClusterA only (no load balancing)
  • WriteStrategy: nil - Concurrent dual-write to both clusters
  • FailoverPolicy: nil - Always attempts failover on read failure
  • Replayer: nil - CAUTION: Partial write failures will be lost!

Production recommendations:

  • ReadStrategy: policy.NewStickyRead() for cache-efficient reads
  • WriteStrategy: policy.NewAdaptiveDualWrite() for latency-aware writes
  • FailoverPolicy: policy.NewActiveFailover() for immediate failover
  • Replayer: replay.NewNATSReplayer() for durable failure recovery

A warning is logged during client creation if dual-cluster mode is used without a Replayer configured.

Returns:

  • *ClientConfig: Configuration with default settings

type ClusterID

type ClusterID = types.ClusterID

Type aliases for convenience - re-export from types package.

type ClusterNames

type ClusterNames = types.ClusterNames

Type aliases for convenience - re-export from types package.

type ColumnInfo

type ColumnInfo struct {
	// Keyspace is the keyspace containing the table.
	Keyspace string

	// Table is the table name.
	Table string

	// Name is the column name.
	Name string

	// TypeInfo describes the CQL type (implementation-specific).
	TypeInfo any
}

ColumnInfo holds metadata about a column in query results.

type Consistency

type Consistency = types.Consistency

Type aliases for convenience - re-export from types package.

type FailoverPolicy

type FailoverPolicy interface {
	// ShouldFailover determines if failover should occur.
	//
	// Parameters:
	//   - cluster: The cluster that failed
	//   - err: The error that occurred
	//
	// Returns:
	//   - bool: true if failover should be attempted
	ShouldFailover(cluster ClusterID, err error) bool

	// RecordFailure records a failure for circuit breaker logic.
	//
	// Parameters:
	//   - cluster: The cluster that failed
	RecordFailure(cluster ClusterID)

	// RecordSuccess records a success to reset failure counters.
	//
	// Parameters:
	//   - cluster: The cluster that succeeded
	RecordSuccess(cluster ClusterID)
}

FailoverPolicy controls when and how failover occurs.

Implementations MUST be safe for concurrent use from multiple goroutines. RecordFailure/RecordSuccess may be called concurrently while ShouldFailover is evaluating the current state.

type Iter

type Iter interface {
	// Scan reads the next row into dest.
	//
	// Parameters:
	//   - dest: Pointers to variables to receive column values
	//
	// Returns:
	//   - bool: true if a row was read, false if no more rows
	Scan(dest ...any) bool

	// Close closes the iterator and returns any error.
	//
	// Returns:
	//   - error: Any error that occurred during iteration
	Close() error

	// MapScan reads the next row into the map.
	//
	// Parameters:
	//   - m: Map to receive column name → value pairs
	//
	// Returns:
	//   - bool: true if a row was read, false if no more rows
	MapScan(m map[string]any) bool

	// SliceMap reads all remaining rows into a slice of maps.
	//
	// Returns:
	//   - []map[string]any: All remaining rows
	//   - error: Any error that occurred
	SliceMap() ([]map[string]any, error)

	// PageState returns the pagination token for resuming iteration.
	//
	// Returns:
	//   - []byte: Opaque pagination token
	PageState() []byte

	// NumRows returns the number of rows in the current page.
	//
	// Note: This only reflects rows in the current page, not total results.
	// The value updates when new pages are fetched.
	//
	// Returns:
	//   - int: Number of rows in current page
	NumRows() int

	// Columns returns metadata about the columns in the result set.
	//
	// Returns:
	//   - []ColumnInfo: Slice of column metadata
	Columns() []ColumnInfo

	// Scanner returns a database/sql-style scanner for the iterator.
	//
	// After calling Scanner, the iterator should not be used directly.
	//
	// Returns:
	//   - Scanner: Scanner for row-by-row processing
	Scanner() Scanner

	// Warnings returns any warnings from the Cassandra server.
	//
	// Available in CQL protocol v4 and above.
	//
	// Returns:
	//   - []string: Server warning messages
	Warnings() []string
}

Iter mirrors gocql.Iter for iterating over query results.

type LatencyRecorder

type LatencyRecorder interface {
	// RecordLatency records the latency of a successful operation.
	//
	// Implementations may treat slow responses (above a threshold) as failures
	// for circuit breaker purposes.
	//
	// Parameters:
	//   - cluster: The cluster that was accessed
	//   - latency: The operation duration
	RecordLatency(cluster ClusterID, latency time.Duration)
}

LatencyRecorder is an optional interface for failover policies that track latency.

Policies implementing this interface will have RecordLatency called automatically by the client after successful operations. This enables latency-aware circuit breaking where slow responses are treated as "soft failures".

Implementations MUST be safe for concurrent use from multiple goroutines.

Example implementation: policy.LatencyCircuitBreaker

type Logger

type Logger = types.Logger

Type aliases for convenience - re-export from types package.

type MetricsCollector

type MetricsCollector = types.MetricsCollector

Type aliases for convenience - re-export from types package.

type Option

type Option func(*ClientConfig)

Option configures a ClientConfig.

func WithAutoMemoryWorker added in v0.5.0

func WithAutoMemoryWorker(queueCapacity int, workerOpts ...replay.WorkerOption) Option

WithAutoMemoryWorker enables automatic in-process replay with MemoryReplayer.

This is a convenience option that creates both a MemoryReplayer and a Worker automatically, eliminating boilerplate setup code. The worker uses the client's DefaultExecuteFunc() to route replays to the correct cluster.

Use this for:

  • Development and testing environments
  • Simple deployments where in-process replay is acceptable

For production with durable replay, use WithReplayer() with NATSReplayer instead. NATS workers typically run as separate consumer services.

Parameters:

  • queueCapacity: Maximum pending replays (0 uses default of 10000)
  • workerOpts: Optional worker configuration (poll interval, callbacks, etc.)

Returns:

  • Option: Configuration option

Example:

// Simple setup with defaults
client, _ := helix.NewCQLClient(sessionA, sessionB,
    helix.WithAutoMemoryWorker(0),
)

// Custom capacity and callbacks
client, _ := helix.NewCQLClient(sessionA, sessionB,
    helix.WithAutoMemoryWorker(50000,
        replay.WithPollInterval(50*time.Millisecond),
        replay.WithOnSuccess(func(p types.ReplayPayload) {
            log.Printf("Replay succeeded: cluster=%s", p.TargetCluster)
        }),
    ),
)

func WithClusterNames

func WithClusterNames(nameA, nameB string) Option

WithClusterNames sets custom display names for the clusters.

These names are used in metrics labels and log messages instead of the default "A" and "B". Names must be Prometheus-compatible (alphanumeric with underscores, starting with letter or underscore, max 32 chars).

If not set, defaults to "A" and "B".

Parameters:

  • nameA: Display name for cluster A (e.g., "us_east", "primary", "dc1")
  • nameB: Display name for cluster B (e.g., "us_west", "secondary", "dc2")

Returns:

  • Option: Configuration option

Example:

client, _ := helix.NewCQLClient(sessionA, sessionB,
    helix.WithClusterNames("us_east", "us_west"),
)

This will produce metrics like:

helix_read_total{cluster="us_east"} 100
helix_read_total{cluster="us_west"} 95

func WithFailoverPolicy

func WithFailoverPolicy(policy FailoverPolicy) Option

WithFailoverPolicy sets the failover policy for reads.

Parameters:

  • policy: The failover policy to use

Returns:

  • Option: Configuration option

func WithLogger

func WithLogger(logger types.Logger) Option

WithLogger sets the structured logger.

If not set, a no-op logger is used that discards all messages. The logger interface is compatible with zap.SugaredLogger.

Parameters:

  • logger: The logger implementation

Returns:

  • Option: Configuration option

Example:

logger, _ := zap.NewProduction()
client, _ := helix.NewCQLClient(sessionA, sessionB,
    helix.WithLogger(logger.Sugar()),
)

func WithMetrics

func WithMetrics(collector MetricsCollector) Option

WithMetrics sets the metrics collector.

If not set, a no-op collector is used that discards all metrics. Use contrib/metrics/vm.New() for VictoriaMetrics integration.

Parameters:

  • collector: The metrics collector implementation

Returns:

  • Option: Configuration option

Example:

import vmmetrics "github.com/arloliu/helix/contrib/metrics/vm"

collector := vmmetrics.New(vmmetrics.WithPrefix("myapp"))
client, _ := helix.NewCQLClient(sessionA, sessionB,
    helix.WithMetrics(collector),
)

func WithOnReplayDropped

func WithOnReplayDropped(handler ReplayDroppedHandler) Option

WithOnReplayDropped sets a callback for when replay payloads cannot be enqueued.

This callback is invoked when a write succeeds on one cluster but fails on the other, and the failed write cannot be enqueued for replay (e.g., queue is full). Use this to implement custom alerting or fallback persistence strategies.

Parameters:

  • handler: Function called with the dropped payload and the enqueue error

Returns:

  • Option: Configuration option

Example:

helix.WithOnReplayDropped(func(payload types.ReplayPayload, err error) {
    log.Error("replay queue full, potential data loss",
        "cluster", payload.TargetCluster,
        "query", payload.Query,
        "error", err,
    )
    // Optionally persist to a fallback store
    fallbackStore.Save(payload)
})

func WithReadStrategy

func WithReadStrategy(strategy ReadStrategy) Option

WithReadStrategy sets the read routing strategy.

Parameters:

  • strategy: The read strategy to use (e.g., StickyRead)

Returns:

  • Option: Configuration option

func WithReplayWorker

func WithReplayWorker(worker ReplayWorker) Option

WithReplayWorker sets the replay worker for processing failed writes.

The worker will be started automatically when the client is created and stopped when the client is closed.

Parameters:

  • worker: The replay worker implementation (e.g., MemoryWorker, NATSWorker)

Returns:

  • Option: Configuration option

func WithReplayer

func WithReplayer(replayer Replayer) Option

WithReplayer sets the replayer for failed writes.

Parameters:

  • replayer: The replayer implementation

Returns:

  • Option: Configuration option

func WithTimestampProvider

func WithTimestampProvider(fn TimestampProvider) Option

WithTimestampProvider sets the timestamp generator.

Parameters:

  • fn: Function that returns current timestamp in microseconds

Returns:

  • Option: Configuration option

func WithTopologyWatcher

func WithTopologyWatcher(watcher TopologyWatcher) Option

WithTopologyWatcher sets the topology watcher for drain mode support.

Parameters:

  • watcher: The topology watcher implementation

Returns:

  • Option: Configuration option

func WithWriteStrategy

func WithWriteStrategy(strategy WriteStrategy) Option

WithWriteStrategy sets the write execution strategy.

Parameters:

  • strategy: The write strategy to use (e.g., ConcurrentDualWrite)

Returns:

  • Option: Configuration option

type PriorityLevel

type PriorityLevel = types.PriorityLevel

Type aliases for convenience - re-export from types package.

type Query

type Query interface {
	// WithContext associates a context with the query.
	//
	// Parameters:
	//   - ctx: Context for cancellation and timeout
	//
	// Returns:
	//   - Query: The same query for chaining
	WithContext(ctx context.Context) Query

	// Consistency sets the consistency level for the query.
	//
	// Parameters:
	//   - c: Consistency level
	//
	// Returns:
	//   - Query: The same query for chaining
	Consistency(c Consistency) Query

	// SetConsistency sets the consistency level for the query.
	//
	// Deprecated: Use Consistency() instead. This method exists for gocql v1 compatibility.
	//
	// Parameters:
	//   - c: Consistency level
	SetConsistency(c Consistency)

	// PageSize sets the number of rows to fetch per page.
	//
	// Parameters:
	//   - n: Number of rows per page
	//
	// Returns:
	//   - Query: The same query for chaining
	PageSize(n int) Query

	// PageState sets the pagination state for resuming iteration.
	//
	// Parameters:
	//   - state: Opaque pagination token from a previous Iter.PageState()
	//
	// Returns:
	//   - Query: The same query for chaining
	PageState(state []byte) Query

	// WithTimestamp sets a specific timestamp for the write operation.
	//
	// This is critical for idempotency in dual-write scenarios.
	// If not set, the client's TimestampProvider is used.
	//
	// Parameters:
	//   - ts: Timestamp in microseconds since Unix epoch
	//
	// Returns:
	//   - Query: The same query for chaining
	WithTimestamp(ts int64) Query

	// WithPriority sets the replay priority level for this write operation.
	//
	// Priority affects replay processing order when a write fails on one cluster.
	// High priority writes are processed before low priority writes.
	// If not set, defaults to PriorityHigh.
	//
	// Parameters:
	//   - p: Priority level (PriorityHigh or PriorityLow)
	//
	// Returns:
	//   - Query: The same query for chaining
	WithPriority(p PriorityLevel) Query

	// SerialConsistency sets the consistency level for the serial phase of CAS operations.
	//
	// This only applies to conditional updates (INSERT/UPDATE with IF clause).
	// Valid values are Serial or LocalSerial.
	//
	// Parameters:
	//   - c: Serial consistency level (Serial or LocalSerial)
	//
	// Returns:
	//   - Query: The same query for chaining
	SerialConsistency(c Consistency) Query

	// Exec executes a write query using the Write Strategy.
	//
	// This triggers concurrent dual-write to both clusters.
	// Returns nil if at least one cluster succeeds (partial writes
	// are handled via the Replayer).
	//
	// Returns:
	//   - error: nil on success (at least one cluster), error if both fail
	Exec() error

	// ExecContext executes a write query with the given context.
	//
	// Parameters:
	//   - ctx: Context for cancellation and timeout
	//
	// Returns:
	//   - error: nil on success, error if both clusters fail
	ExecContext(ctx context.Context) error

	// Scan executes a read query and scans a single row into dest.
	//
	// This triggers the Read Strategy (Sticky Read with failover).
	//
	// Parameters:
	//   - dest: Pointers to variables to receive column values
	//
	// Returns:
	//   - error: nil on success, error if read fails on all clusters
	Scan(dest ...any) error

	// ScanContext executes a read query with context.
	//
	// Parameters:
	//   - ctx: Context for cancellation and timeout
	//   - dest: Pointers to variables to receive column values
	//
	// Returns:
	//   - error: nil on success, error if read fails
	ScanContext(ctx context.Context, dest ...any) error

	// Iter returns an iterator for reading multiple rows.
	//
	// This triggers the Read Strategy. The iterator reads from
	// the selected cluster based on sticky routing and failover.
	//
	// Returns:
	//   - Iter: Iterator for scanning rows
	Iter() Iter

	// IterContext returns an iterator with the given context.
	//
	// Parameters:
	//   - ctx: Context for cancellation and timeout
	//
	// Returns:
	//   - Iter: Iterator for scanning rows
	IterContext(ctx context.Context) Iter

	// MapScan executes a read and scans a single row into the map.
	//
	// Parameters:
	//   - m: Map to receive column name → value pairs
	//
	// Returns:
	//   - error: nil on success, error if read fails
	MapScan(m map[string]any) error

	// MapScanContext executes a read with context and scans a single row into the map.
	//
	// Parameters:
	//   - ctx: Context for cancellation and timeout
	//   - m: Map to receive column name → value pairs
	//
	// Returns:
	//   - error: nil on success, error if read fails
	MapScanContext(ctx context.Context, m map[string]any) error

	// ScanCAS executes a lightweight transaction (INSERT/UPDATE with IF clause).
	//
	// If the transaction fails because conditions weren't met, the previous
	// values are stored in dest.
	//
	// NOTE: CAS operations are executed on a single cluster and are NOT replicated.
	//
	// Parameters:
	//   - dest: Pointers to variables to receive previous values if not applied
	//
	// Returns:
	//   - applied: true if the CAS operation was applied
	//   - err: error if the operation failed
	ScanCAS(dest ...any) (applied bool, err error)

	// ScanCASContext executes a lightweight transaction with context.
	//
	// NOTE: CAS operations are executed on a single cluster and are NOT replicated.
	//
	// Parameters:
	//   - ctx: Context for cancellation and timeout
	//   - dest: Pointers to variables to receive previous values if not applied
	//
	// Returns:
	//   - applied: true if the CAS operation was applied
	//   - err: error if the operation failed
	ScanCASContext(ctx context.Context, dest ...any) (applied bool, err error)

	// MapScanCAS executes a lightweight transaction and returns previous values as a map.
	//
	// NOTE: CAS operations are executed on a single cluster and are NOT replicated.
	//
	// Parameters:
	//   - dest: Map to receive previous values if not applied
	//
	// Returns:
	//   - applied: true if the CAS operation was applied
	//   - err: error if the operation failed
	MapScanCAS(dest map[string]any) (applied bool, err error)

	// MapScanCASContext executes a lightweight transaction with context.
	//
	// NOTE: CAS operations are executed on a single cluster and are NOT replicated.
	//
	// Parameters:
	//   - ctx: Context for cancellation and timeout
	//   - dest: Map to receive previous values if not applied
	//
	// Returns:
	//   - applied: true if the CAS operation was applied
	//   - err: error if the operation failed
	MapScanCASContext(ctx context.Context, dest map[string]any) (applied bool, err error)
}

Query mirrors gocql.Query and provides chainable configuration.

The method called to execute the query determines whether it's a read or write:

  • Exec/ExecContext → Write Strategy (Dual Write)
  • Scan/Iter/MapScan → Read Strategy (Sticky Read)

Context Usage Patterns

Two equivalent patterns are supported for context handling:

// Pattern 1: Direct context method (recommended for simple cases)
err := query.ExecContext(ctx)
result := query.ScanContext(ctx, &dest)

// Pattern 2: Method chaining (useful with multiple options)
err := query.Consistency(helix.Quorum).WithContext(ctx).Exec()

Both patterns respect context cancellation and timeouts. Choose Pattern 1 when you only need to set a context. Choose Pattern 2 when chaining multiple configuration calls (consistency, page size, timestamps, etc.).

type ReadStrategy

type ReadStrategy interface {
	// Select chooses which cluster to read from.
	//
	// Parameters:
	//   - ctx: Context for the operation
	//
	// Returns:
	//   - ClusterID: The cluster to read from
	Select(ctx context.Context) ClusterID

	// OnSuccess is called when a read succeeds.
	//
	// Parameters:
	//   - cluster: The cluster that succeeded
	OnSuccess(cluster ClusterID)

	// OnFailure is called when a read fails.
	//
	// Parameters:
	//   - cluster: The cluster that failed
	//   - err: The error that occurred
	//
	// Returns:
	//   - ClusterID: Alternative cluster to try, or empty if no failover
	//   - bool: true if failover should be attempted
	OnFailure(cluster ClusterID, err error) (ClusterID, bool)
}

ReadStrategy defines how reads are routed to clusters.

Implementations MUST be safe for concurrent use from multiple goroutines. All methods may be called concurrently from different operations.

type ReplayDroppedHandler

type ReplayDroppedHandler func(payload types.ReplayPayload, err error)

ReplayDroppedHandler is called when a replay payload cannot be enqueued. This callback allows applications to handle potential data loss scenarios.

Parameters:

  • payload: The payload that could not be enqueued
  • err: The error from the enqueue attempt

type ReplayPayload

type ReplayPayload = types.ReplayPayload

Type aliases for convenience - re-export from types package.

type ReplayWorker

type ReplayWorker interface {
	// Start begins processing replay messages in background goroutines.
	//
	// Returns:
	//   - error: ErrWorkerAlreadyRunning if already started
	Start() error

	// Stop gracefully stops the worker and waits for pending work to complete.
	Stop()

	// IsRunning returns whether the worker is currently running.
	IsRunning() bool
}

ReplayWorker processes failed writes from a replay queue.

Implementations include MemoryWorker and NATSWorker from the replay package.

type Replayer

type Replayer interface {
	// Enqueue adds a failed write to the replay queue.
	//
	// Parameters:
	//   - ctx: Context for cancellation
	//   - payload: The write operation to replay
	//
	// Returns:
	//   - error: nil on success, error if enqueue fails
	Enqueue(ctx context.Context, payload ReplayPayload) error
}

Replayer handles asynchronous reconciliation of failed writes.

When a dual-write partially fails (one cluster succeeds, one fails), the failed write is enqueued for later replay.

Implementations MUST be safe for concurrent use from multiple goroutines. Enqueue may be called concurrently from different write operations.

type SQLClient

type SQLClient struct {
	// contains filtered or unexported fields
}

SQLClient is the Helix SQL client for single or dual-database operations.

It wraps one or two SQL databases and orchestrates writes according to configured strategies. When only one database is provided (secondary is nil), the client operates in single-database mode with pass-through behavior.

Single-database mode is ideal for:

  • Migrating existing applications to Helix incrementally
  • Development and testing environments
  • Applications that don't need dual-database redundancy yet

Limitations (dual-database mode):

  • Single-statement non-transactional operations only
  • No transaction support across databases
  • Best-effort semantics (no strong consistency guarantees)
  • Write order determined by arrival time (no USING TIMESTAMP like Cassandra)

func NewSQLClient

func NewSQLClient(primary, secondary sqladapter.DB, opts ...Option) (*SQLClient, error)

NewSQLClient creates a new Helix SQL client.

The client supports two modes:

  • Single-database mode: Pass secondary as nil. Operations are executed directly on primary without dual-write or failover logic. This provides a drop-in replacement for existing single-database applications.
  • Dual-database mode: Pass both databases. Operations use configured strategies for dual writes, sticky reads, and failover.

If a ReplayWorker is configured, it will be started automatically. The worker will be stopped when Close() is called.

Parameters:

  • primary: Primary database connection (required)
  • secondary: Secondary database connection (optional, nil for single-database mode)
  • opts: Optional configuration options

Returns:

  • *SQLClient: A new SQL client
  • error: ErrNilSession if primary is nil, or error from worker start

func NewSQLClientFromDB

func NewSQLClientFromDB(primary, secondary *sql.DB, opts ...Option) (*SQLClient, error)

NewSQLClientFromDB creates a new Helix SQL client from standard *sql.DB connections.

This is a convenience constructor that wraps the databases in adapters. Pass secondary as nil for single-database mode.

Parameters:

  • primary: Primary *sql.DB connection (required)
  • secondary: Secondary *sql.DB connection (optional, nil for single-database mode)
  • opts: Optional configuration options

Returns:

  • *SQLClient: A new SQL client
  • error: ErrNilSession if primary is nil

func (*SQLClient) Close

func (c *SQLClient) Close() error

Close closes database connection(s) and stops the replay worker.

The replay worker is stopped first to allow any pending replays to complete. After Close is called, the client cannot be reused.

func (*SQLClient) Exec

func (c *SQLClient) Exec(query string, args ...any) (sql.Result, error)

Exec executes a write query using a background context.

Parameters:

  • query: SQL statement to execute
  • args: Arguments for the query

Returns:

  • sql.Result: Result from the successful write
  • error: nil if at least one succeeds, error if both fail

func (*SQLClient) ExecContext

func (c *SQLClient) ExecContext(ctx context.Context, query string, args ...any) (sql.Result, error)

ExecContext executes a write query.

In single-database mode, the query is executed directly on the primary database. In dual-database mode, the query is executed on both databases concurrently. Success is defined as at least one database succeeding. Failed writes are enqueued for replay if a Replayer is configured.

Parameters:

  • ctx: Context for cancellation and timeout
  • query: SQL statement to execute
  • args: Arguments for the query

Returns:

  • sql.Result: Result from the successful write (primary preferred)
  • error: nil if at least one succeeds, error if both fail

func (*SQLClient) IsSingleCluster

func (c *SQLClient) IsSingleCluster() bool

IsSingleCluster returns true if the client is operating in single-database mode.

In single-database mode, all operations are executed directly on the primary database without dual-write or failover logic.

func (*SQLClient) Ping

func (c *SQLClient) Ping() error

Ping checks if the database(s) are reachable.

Returns:

  • error: nil if at least one database responds, error if all fail

func (*SQLClient) PingContext

func (c *SQLClient) PingContext(ctx context.Context) error

PingContext checks if the database(s) are reachable.

In single-database mode, only the primary database is pinged. In dual-database mode, both databases are pinged concurrently.

Parameters:

  • ctx: Context for cancellation and timeout

Returns:

  • error: nil if at least one database responds, error if all fail

func (*SQLClient) Query

func (c *SQLClient) Query(query string, args ...any) (*sql.Rows, error)

Query executes a read query using a background context.

Parameters:

  • query: SQL statement to execute
  • args: Arguments for the query

Returns:

  • *sql.Rows: Result rows from the query
  • error: Error if the query fails on all databases

func (*SQLClient) QueryContext

func (c *SQLClient) QueryContext(ctx context.Context, query string, args ...any) (*sql.Rows, error)

QueryContext executes a read query.

In single-database mode, the query is executed directly on the primary database. In dual-database mode, the query is routed to the preferred database based on the ReadStrategy. On failure, it may failover to the secondary.

Parameters:

  • ctx: Context for cancellation and timeout
  • query: SQL statement to execute
  • args: Arguments for the query

Returns:

  • *sql.Rows: Result rows from the query (caller must close)
  • error: Error if the query fails on all databases

func (*SQLClient) QueryRow

func (c *SQLClient) QueryRow(query string, args ...any) *sql.Row

QueryRow executes a read query expecting at most one row.

Parameters:

  • query: SQL statement to execute
  • args: Arguments for the query

Returns:

  • *sql.Row: Single row result

func (*SQLClient) QueryRowContext

func (c *SQLClient) QueryRowContext(ctx context.Context, query string, args ...any) *sql.Row

QueryRowContext executes a read query expecting at most one row.

In single-database mode, the query is executed directly on the primary database. In dual-database mode, the query is routed based on the ReadStrategy.

If the client is closed, returns a Row that will return sql.ErrNoRows on Scan. Callers should check IsClosed() before calling if they need to distinguish between "no rows" and "client closed" scenarios.

Parameters:

  • ctx: Context for cancellation and timeout
  • query: SQL statement to execute
  • args: Arguments for the query

Returns:

  • *sql.Row: Single row result (never nil)

type Scanner

type Scanner interface {
	// Next advances to the next row, returning true if a row is available.
	Next() bool

	// Scan reads the current row into dest.
	Scan(dest ...any) error

	// Err returns any error from iteration and releases resources.
	Err() error
}

Scanner provides database/sql-style row scanning.

type TimestampProvider

type TimestampProvider func() int64

TimestampProvider generates timestamps for write operations.

The default provider uses time.Now().UnixMicro().

type TopologyOperator

type TopologyOperator interface {
	// SetDrain sets the drain state for a cluster.
	//
	// Parameters:
	//   - ctx: Context for cancellation/timeout
	//   - cluster: The cluster to update
	//   - draining: true to enable drain mode, false to disable
	//   - reason: Human-readable reason for the drain (only used when draining=true)
	//
	// Returns:
	//   - error: nil on success, error if the operation fails
	SetDrain(ctx context.Context, cluster ClusterID, draining bool, reason string) error
}

TopologyOperator allows setting cluster drain states.

This interface is typically used by operations tools and tests to control cluster availability. Implementations include topology.Local (in-memory).

type TopologyUpdate

type TopologyUpdate struct {
	// Cluster that was updated.
	Cluster ClusterID

	// Available indicates if the cluster is available.
	Available bool

	// DrainMode indicates if the cluster is in drain mode.
	DrainMode bool
}

TopologyUpdate represents a change in cluster topology.

type TopologyWatcher

type TopologyWatcher interface {
	// Watch returns a channel that receives topology updates.
	//
	// Parameters:
	//   - ctx: Context for cancellation
	//
	// Returns:
	//   - <-chan TopologyUpdate: Channel of topology changes
	Watch(ctx context.Context) <-chan TopologyUpdate
}

TopologyWatcher monitors cluster topology changes.

Implementations include topology.Local (in-memory) and topology.NATS (NATS KV backed).

type WriteStrategy

type WriteStrategy interface {
	// Execute performs the write operation on both clusters.
	//
	// Parameters:
	//   - ctx: Context for the operation
	//   - writeA: Function to write to cluster A
	//   - writeB: Function to write to cluster B
	//
	// Returns:
	//   - resultA: Error from cluster A (nil if successful)
	//   - resultB: Error from cluster B (nil if successful)
	Execute(
		ctx context.Context,
		writeA func(context.Context) error,
		writeB func(context.Context) error,
	) (resultA, resultB error)
}

WriteStrategy defines how writes are executed across clusters.

Implementations MUST be safe for concurrent use from multiple goroutines. Execute may be called concurrently from different write operations.

Directories

Path Synopsis
adapter
cql
Package cql provides CQL-specific adapter interfaces for different gocql versions.
Package cql provides CQL-specific adapter interfaces for different gocql versions.
cql/v1
Package v1 provides an adapter for gocql v1 (github.com/gocql/gocql).
Package v1 provides an adapter for gocql v1 (github.com/gocql/gocql).
cql/v2
Package v2 provides an adapter for gocql v2 (github.com/apache/cassandra-gocql-driver).
Package v2 provides an adapter for gocql v2 (github.com/apache/cassandra-gocql-driver).
sql
Package sql provides SQL-specific adapter interfaces for database/sql.
Package sql provides SQL-specific adapter interfaces for database/sql.
contrib
metrics/vm
Package vm provides a VictoriaMetrics-based implementation of the MetricsCollector interface.
Package vm provides a VictoriaMetrics-based implementation of the MetricsCollector interface.
examples
basic command
Package main demonstrates basic usage of the Helix dual-database client library.
Package main demonstrates basic usage of the Helix dual-database client library.
custom-strategy command
Package main demonstrates how to implement custom strategies for Helix.
Package main demonstrates how to implement custom strategies for Helix.
failover command
Package main demonstrates failover behavior in the Helix dual-database client.
Package main demonstrates failover behavior in the Helix dual-database client.
replay command
Package main demonstrates the Helix replay system for handling partial write failures.
Package main demonstrates the Helix replay system for handling partial write failures.
internal
logging
Package logging provides internal logging utilities for Helix.
Package logging provides internal logging utilities for Helix.
metrics
Package metrics provides internal metrics utilities for Helix.
Package metrics provides internal metrics utilities for Helix.
Package policy provides read strategies, write strategies, and failover policies for the helix dual-database client.
Package policy provides read strategies, write strategies, and failover policies for the helix dual-database client.
Package replay provides asynchronous reconciliation mechanisms for failed writes in the helix dual-database client.
Package replay provides asynchronous reconciliation mechanisms for failed writes in the helix dual-database client.
test
simulation/cmd command
testutil
Package testutil provides testing utilities for the helix project.
Package testutil provides testing utilities for the helix project.
Package topology provides cluster topology monitoring for drain mode support.
Package topology provides cluster topology monitoring for drain mode support.
Package types provides shared types and error definitions for the helix library.
Package types provides shared types and error definitions for the helix library.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL