client

package
v1.2.2 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: May 14, 2026 License: MIT Imports: 20 Imported by: 0

README

pkg/client

github.com/FrogoAI/fdb-client/pkg/client is the public FrogoDB smart client. Use it from applications that need to talk to FrogoDB without knowing the binary wire format directly.

The client connects to one or more seed nodes, discovers cluster topology in a background tend loop, caches the partition map, and routes requests directly to the owning node. It also provides connection pooling, optional multiplexing, read/write/batch policies, batch fan-out, pipelining, scans, counts, and operation helpers for server-side data types.

For normal application code, prefer this package over pkg/protocol. Use pkg/protocol only when building wire-level tests, tools, or transports.

Strengths

  • Smart routing: the client discovers cluster topology, keeps a cached partition map, computes digests locally, and routes key operations directly to the owning node when the map is available.
  • Broad SDK surface: single-record CRUD, atomic Operate, batch fan-out, pipelining, scans, approximate counts, filter expressions, and helpers for list, HyperLogLog, counting bloom filter, t-digest, and LSH operations.
  • Performance-oriented defaults: per-node LIFO connection pools, elastic burst connections, pooled protocol requests, optional multiplexed connections, and batch grouping by target node keep common paths deterministic and reduce avoidable allocation work.
  • Operational controls: context-aware calls, read/write/batch policies, topology inspection, pool warm-up, rich sentinel errors, and in-doubt write reporting give callers practical hooks for production behavior and tests.

Weaknesses / Tradeoffs

  • This is a stateful smart client. It starts a background tend loop, owns TCP pools, and must be closed. Early requests, keyless requests, or requests made before a partition map is available can fall back to the seed connection.
  • The public value model is intentionally narrow. Put accepts int64, int, float64, string, and []byte; advanced server-side types must be expressed through operation helpers or lower-level protocol operations.
  • Wire compatibility constrains behavior. Flags, result handling, write modes, TTL actions, scan cursors, and batch payloads need to stay aligned with the server protocol rather than with arbitrary client-side preferences.
  • Throughput features have usage constraints: Pipeline is not safe for concurrent use, multiplexing requires a server that supports the negotiated multiplexed protocol byte, and both modes still need appropriate deadlines.
  • Batch, scan, and count APIs expose distributed-system edges. Batch operations can return node-level errors and optional partial results, ScanAll depends on known cluster nodes and cursors, and count APIs return approximate counts.
  • Faster write options can weaken guarantees. WithCommitMaster trades lower write latency for weaker durability, and network failures after a write is sent can return an in-doubt error because the final server outcome is unknown.

Connect

Use New for the default configuration or NewWithConfig when timeouts, pool sizing, tend intervals, or multiplexing need to be explicit.

package main

import (
	"context"
	"log"
	"time"

	"github.com/FrogoAI/fdb-client/pkg/client"
)

func main() {
	cfg := client.DefaultConfig("127.0.0.1:3000")
	cfg.ConnectionTimeout = 2 * time.Second

	c, err := client.NewWithConfig(cfg)
	if err != nil {
		log.Fatal(err)
	}
	defer c.Close()

	ctx, cancel := context.WithTimeout(context.Background(), time.Second)
	defer cancel()

	if err := c.Ping(ctx); err != nil {
		log.Fatal(err)
	}
}

GetConfigFromEnv builds the same Config from FDB_* environment variables. Unset values use DefaultConfig, with FDB_SEEDS defaulting to localhost:3000.

Records

Put, Get, and Delete cover single-record CRUD. Put accepts bin values of type int64, int, float64, string, and []byte. Returned records decode those particles as int64, float64, string, and []byte.

func saveUser(ctx context.Context, c *client.Client) error {
	err := c.Put(ctx, "app", "users", "u123", map[string]any{
		"name":  "Alice",
		"age":   int64(30),
		"score": 98.5,
	}, client.WithMergeBins(), client.WithTTL(3600))
	if err != nil {
		return err
	}

	rec, err := c.Get(ctx, "app", "users", "u123", "name", "age")
	if err != nil {
		return err
	}

	log.Printf("generation=%d name=%v age=%v", rec.Generation, rec.Bins["name"], rec.Bins["age"])
	return nil
}

Useful write options include:

  • WithMergeBins to update supplied bins and preserve untouched bins.
  • WithReplaceBins to replace the record with only supplied bins.
  • WithTTL, WithPreserveTTL, and WithClearTTL for expiration behavior.
  • WithCreateOnly and WithReplace for record existence preconditions.
  • WithGeneration for optimistic locking.
  • WithCommitMaster for lower-latency writes with weaker durability.

Operations

Operate executes multiple operations atomically against one record. Operation is an alias for protocol.Operation, and this package provides helpers for common operation payloads.

func incrementVisits(ctx context.Context, c *client.Client) (*client.Record, error) {
	lastSeen, err := client.WriteOp("last_seen", "2026-05-14T00:00:00Z")
	if err != nil {
		return nil, err
	}

	ops := []client.Operation{
		client.IncrIntOp("visits", 1),
		lastSeen,
		client.ReadOp("visits"),
	}

	return c.Operate(ctx, "app", "users", "u123", ops)
}

Additional helpers create operations for lists, HyperLogLog, counting bloom filters, and t-digest bins, such as ListAppendOp, HLLAddOp, BloomTestOp, and TDigestQuantileOp. LSHDedup and LSHVector send the server-side LSH requests and return the generated ID.

Batch, Pipeline, Scan, Count

Batch methods fan out by owning node and preserve input order: BatchGet, BatchPut, BatchDelete, and BatchOperate. Use WithConcurrentNodes to limit node concurrency and WithAllowPartialResults when partial results should be returned after a node-level failure.

func readUsers(ctx context.Context, c *client.Client) ([]*client.Record, error) {
	return c.BatchGet(ctx, []client.BatchKey{
		{Namespace: "app", Set: "users", Key: "u123", BinNames: []string{"name"}},
		{Namespace: "app", Set: "users", Key: "u456", BinNames: []string{"name"}},
	}, client.WithConcurrentNodes(8))
}

Pipeline queues multiple single-record requests and sends them on one TCP connection per target node, then reads responses in order. A pipeline is not safe for concurrent use.

Scan pages records on one route. ScanAll scans all known cluster nodes and uses an opaque ScanAllCursor to resume. Count and CountAll return approximate record counts.

func scanAdults(ctx context.Context, c *client.Client, cursor client.ScanAllCursor) (*client.ScanAllResult, error) {
	expr := protocol.ExpGreaterOrEqual(protocol.ExpIntBin("age"), protocol.ExpIntVal(18))
	return c.ScanAll(ctx, "app", "users", 100, cursor, client.WithScanFilter(expr))
}

The scan example imports github.com/FrogoAI/fdb-client/pkg/protocol for the filter expression builders. The same expression bytes can be passed to batch reads with WithFilterExpression.

Policies, Topology, Errors

SetReadPolicy, SetWritePolicy, and SetBatchPolicy update client defaults for future operations. Each operation snapshots the relevant policy before applying per-call options.

Topology inspection is available through Nodes, PartitionMapVersion, and Cluster().Nodes(). PartitionIDForKey and DigestForKey compute the same client-side routing values used by the partition map.

Errors are rich *client.Error values with sentinel instances such as ErrKeyNotFound, ErrKeyExists, ErrGenerationMismatch, ErrTimeout, and ErrInDoubt.

if errors.Is(err, client.ErrKeyNotFound) {
	return nil
}

var cerr *client.Error
if errors.As(err, &cerr) && cerr.IsInDoubt() {
	log.Printf("write outcome is unknown after %d retries", cerr.Iteration)
}

Documentation

Overview

Package client provides the FrogoDB smart client library.

The client maintains a local partition map, routes requests directly to the owning node (O(1) lookup), and provides connection pooling, batch fan-out, and a policy system.

Quick start:

c, err := client.New("localhost:3000")
defer c.Close()
err = c.Put(ctx, "ns", "set", "key", map[string]any{"name": "Alice"})
rec, err := c.Get(ctx, "ns", "set", "key")

Index

Constants

View Source
const (
	ListReturnNone  uint8 = 0
	ListReturnValue uint8 = 1
	ListReturnCount uint8 = 2
)

List return types for list remove operations.

View Source
const ListFlagAddUnique uint8 = 1 << 0

ListFlagAddUnique rejects the append if the value already exists in the list. Duplicate detection uses exact MessagePack byte equality with no coercion, so values such as "1", 1, and 1.0 are all distinct. Combine with ListFlagNoFail to silently skip duplicates instead of returning an error.

View Source
const ListFlagNoFail uint8 = 1 << 1

ListFlagNoFail suppresses the error returned by ListFlagAddUnique when a duplicate is detected. The append is silently skipped and the operation succeeds. Has no effect without ListFlagAddUnique.

Variables

View Source
var (
	ErrKeyNotFound          = &Error{Message: "key not found", ResultCode: ResultKeyNotFound}
	ErrKeyExists            = &Error{Message: "key already exists", ResultCode: ResultKeyExists}
	ErrGenerationMismatch   = &Error{Message: "generation mismatch", ResultCode: ResultGenerationError}
	ErrServerError          = &Error{Message: "server error", ResultCode: ResultServerError}
	ErrPartitionUnavailable = &Error{Message: "partition unavailable", ResultCode: ResultPartitionUnavailable}
	ErrInDoubt              = &Error{Message: "in doubt", ResultCode: ResultInDoubt, InDoubt: true}
	ErrOverloaded           = &Error{Message: "server overloaded", ResultCode: ResultOverloaded}
	ErrFilteredOut          = &Error{Message: "filtered out", ResultCode: ResultFilteredOut}
	ErrTimeout              = &Error{Message: "operation timeout", ResultCode: ResultTimeout}
	ErrClusterDown          = &Error{Message: "cluster down", ResultCode: ResultClusterDown}
	ErrUnsupportedType      = &Error{Message: "unsupported value type", ResultCode: ResultUnsupportedType}
	ErrNoSeeds              = &Error{Message: "no seed addresses provided", ResultCode: ResultNoSeeds}
	ErrClosed               = &Error{Message: "client is closed", ResultCode: ResultClosed}
)

Sentinel errors.

Functions

func DigestForKey

func DigestForKey(set, key string) [20]byte

DigestForKey computes RIPEMD-160 digest for a string key in a set.

func PartitionIDForKey

func PartitionIDForKey(set, key string) uint16

PartitionIDForKey computes the partition ID for a key without a cluster lookup.

Types

type BasePolicy

type BasePolicy struct {
	TotalTimeout        time.Duration // overall operation deadline; default 1s
	SocketTimeout       time.Duration // per-socket I/O deadline; default 30s
	MaxRetries          int           // retry attempts on transient failure; default 2 reads, 0 writes
	SleepBetweenRetries time.Duration // pause between retries; default 500ms
	ReplicaPolicy       ReplicaPolicy // read routing target; default ReplicaMaster
}

BasePolicy contains common timeout and retry settings shared by all operations.

func DefaultReadPolicy

func DefaultReadPolicy() BasePolicy

DefaultReadPolicy returns a BasePolicy tuned for read operations.

func DefaultWriteBasePolicy

func DefaultWriteBasePolicy() BasePolicy

DefaultWriteBasePolicy returns a BasePolicy tuned for write operations.

type BatchDeleteKey

type BatchDeleteKey struct {
	Namespace string
	Set       string
	Key       string
}

BatchDeleteKey identifies a key for a batch DELETE operation.

type BatchError

type BatchError struct {
	Errors map[int]error
}

BatchError reports per-key errors from a batch operation.

func (*BatchError) Error

func (e *BatchError) Error() string

Error implements the error interface.

type BatchKey

type BatchKey struct {
	Namespace string
	Set       string
	Key       string
	BinNames  []string
}

BatchKey identifies a key for a batch GET operation.

type BatchOperate

type BatchOperate struct {
	Namespace  string
	Set        string
	Key        string
	Operations []Operation
	TTL        uint32
	Generation uint32
}

BatchOperate describes a key and operations for a batch OPERATE.

type BatchOption

type BatchOption func(*batchOptions)

BatchOption configures a batch operation.

func WithAllowPartialResults

func WithAllowPartialResults() BatchOption

WithAllowPartialResults allows the batch to return partial results on node-level failures instead of failing the entire batch.

func WithBatchPolicy

func WithBatchPolicy(p BatchPolicy) BatchOption

WithBatchPolicy applies all fields from a BatchPolicy to this batch.

func WithConcurrentNodes

func WithConcurrentNodes(n int) BatchOption

WithConcurrentNodes limits the number of nodes contacted in parallel. 0 (default) means unlimited, 1 means sequential.

func WithFilterExpression

func WithFilterExpression(expr []byte) BatchOption

WithFilterExpression sets a server-side filter expression for the batch. Records that do not match the filter are returned with ResultFilteredOut. Build expressions using protocol.ExpGreater, protocol.ExpIntBin, etc.

type BatchPolicy

type BatchPolicy struct {
	BasePolicy
	ConcurrentNodes     int  // max parallel node requests; 0 = unlimited
	AllowPartialResults bool // return partial results on node failure
}

BatchPolicy controls behavior of batch operations.

func DefaultBatchPolicy

func DefaultBatchPolicy() BatchPolicy

DefaultBatchPolicy returns a BatchPolicy with sensible defaults.

type BatchWrite

type BatchWrite struct {
	Namespace          string
	Set                string
	Key                string
	Bins               map[string]any
	TTL                uint32
	Generation         uint32
	RecordExistsAction RecordExistsAction
	RecordWriteMode    RecordWriteMode
	TTLAction          TTLAction
}

BatchWrite describes a record to write in a batch PUT operation.

type Client

type Client struct {
	// contains filtered or unexported fields
}

Client is the FrogoDB smart client.

It connects to one or more seed nodes, discovers the cluster topology via a background tend loop, and routes each request directly to the node that owns the target partition. Each node maintains a LIFO connection pool for high connection reuse.

func New

func New(seeds ...string) (*Client, error)

New creates a new Client connected to the given seed addresses.

func NewWithConfig

func NewWithConfig(config Config) (*Client, error)

NewWithConfig creates a new Client with the given configuration.

func (*Client) BatchDelete

func (c *Client) BatchDelete(ctx context.Context, keys []BatchDeleteKey, opts ...BatchOption) ([]bool, error)

BatchDelete deletes multiple records in parallel, fanning out by owning node. Returns a bool slice in the same order as keys: true if the record existed before deletion, false if not found. On node-level failure the returned error is non-nil; with AllowPartialResults the results slice is still populated for nodes that succeeded.

func (*Client) BatchGet

func (c *Client) BatchGet(ctx context.Context, keys []BatchKey, opts ...BatchOption) ([]*Record, error)

BatchGet retrieves multiple records in parallel, fanning out by owning node. Results are returned in the same order as keys. A nil *Record at position i means the key was not found. On node-level failure the returned error is non-nil; with AllowPartialResults the records slice is still populated for nodes that succeeded.

func (*Client) BatchOperate

func (c *Client) BatchOperate(ctx context.Context, records []BatchOperate, opts ...BatchOption) ([]*Record, error)

BatchOperate executes operations on multiple records in parallel, fanning out by owning node. Results are returned in the same order as records.

func (*Client) BatchPut

func (c *Client) BatchPut(ctx context.Context, records []BatchWrite, opts ...BatchOption) error

BatchPut writes multiple records in parallel, fanning out by owning node. On node-level failure the returned error is non-nil; with AllowPartialResults writes to reachable nodes still succeed.

func (*Client) Close

func (c *Client) Close() error

Close stops the tend loop and closes all connections.

func (*Client) Cluster

func (c *Client) Cluster() *Cluster

Cluster returns the cluster manager for topology inspection.

func (*Client) Count

func (c *Client) Count(ctx context.Context, ns, set string) (int64, error)

Count returns the approximate record count for a namespace, optionally filtered by set. Pass an empty set to count all records in the namespace.

func (*Client) CountAll

func (c *Client) CountAll(ctx context.Context, ns, set string) (int64, error)

CountAll returns the approximate logical record count for a namespace/set across all known cluster nodes. It fans out node-local count requests with primary-only filtering enabled so replica copies are not double-counted. When no partition map is available, it falls back to the connected node.

func (*Client) Delete

func (c *Client) Delete(ctx context.Context, ns, set, key string, opts ...WriteOption) (bool, error)

Delete removes a record. Returns true if the record existed before deletion.

func (*Client) Get

func (c *Client) Get(ctx context.Context, ns, set, key string, binNames ...string) (*Record, error)

Get reads a record. If binNames are specified, only those bins are returned. The client's default ReadPolicy controls timeouts, retries, and replica routing. Use SetReadPolicy to override defaults.

func (*Client) GetPartitionMap

func (c *Client) GetPartitionMap(ctx context.Context) (*PartitionMapInfo, error)

GetPartitionMap requests the full partition map from the server.

func (*Client) LSHDedup

func (c *Client) LSHDedup(ctx context.Context, ns, group, input string, opts ...WriteOption) (string, error)

LSHDedup sends an LSH string deduplication request to the server. The server computes the LSH bucket for the input string and returns the client-visible bucket ID. That ID is the historical lowercase hex value from the first eight bytes of SHA-1 over "<dedup-scope>:<input>"; embedded LSH storage IDs used by the server remain internal. The group parameter selects the dedup scope, with the server treating the query-helper group "lsh_dedup" as namespace-scoped.

func (*Client) LSHVector

func (c *Client) LSHVector(
	ctx context.Context,
	ns, group string,
	vector []float64,
	opts ...WriteOption,
) (string, error)

LSHVector sends an LSH vector request to the server. The server computes the LSH bucket for the float64 vector and returns the behavioural ID. The group parameter namespaces different LSH indexes. Vector wire format: packed big-endian float64 values.

func (*Client) NodeForKey

func (c *Client) NodeForKey(ns, set, key string, replica bool) string

NodeForKey returns the address of the node that owns the partition for the given key. Returns "" if no partition map is available.

func (*Client) Nodes added in v1.2.0

func (c *Client) Nodes() []string

Nodes returns the sorted addresses of currently known cluster nodes.

func (*Client) Operate

func (c *Client) Operate(
	ctx context.Context,
	ns, set, key string,
	ops []Operation,
	opts ...WriteOption,
) (*Record, error)

Operate executes a list of operations atomically on a single record.

func (*Client) PartitionMapVersion added in v1.2.0

func (c *Client) PartitionMapVersion() int64

PartitionMapVersion returns the cached partition map version, or 0 when the client has not received a partition map yet.

func (*Client) Ping

func (c *Client) Ping(ctx context.Context) error

Ping sends a health-check request to the server.

func (*Client) PingAddr

func (c *Client) PingAddr(ctx context.Context, addr string) error

PingAddr sends a health-check to a specific node address by opening a temporary connection, sending OpPing, and verifying the result.

func (*Client) Pipeline

func (c *Client) Pipeline() *Pipeline

Pipeline creates a new Pipeline for batching multiple requests on a single TCP connection without waiting for individual responses. See Pipeline.Execute for details.

func (*Client) Put

func (c *Client) Put(ctx context.Context, ns, set, key string, bins map[string]any, opts ...WriteOption) error

Put writes a record with the given bins. Bin values must be int64, int, float64, string, or []byte.

func (*Client) Scan

func (c *Client) Scan(
	ctx context.Context,
	ns, set string,
	limit int,
	cursor uint32,
	opts ...ScanOption,
) (*ScanResult, error)

Scan iterates records in a namespace (optionally filtered by set) up to limit. Pass cursor=0 to start a new scan; use ScanResult.NextCursor to continue. A NextCursor of 0 with fewer records than limit indicates the scan is complete.

func (*Client) ScanAll

func (c *Client) ScanAll(
	ctx context.Context,
	ns, set string,
	limit int,
	cursor ScanAllCursor,
	opts ...ScanOption,
) (*ScanAllResult, error)

ScanAll scans records across all known cluster nodes using an opaque cursor that tracks per-node local scan progress. When no partition map is available, it falls back to scanning the connected node only.

func (*Client) SetBatchPolicy

func (c *Client) SetBatchPolicy(p BatchPolicy)

SetBatchPolicy sets the default batch policy for the client. It is safe to call while operations are running; each operation uses a single snapshot of the default policy before applying per-call options.

func (*Client) SetReadPolicy

func (c *Client) SetReadPolicy(p BasePolicy)

SetReadPolicy sets the default read policy for the client. It is safe to call while operations are running; each operation uses a single snapshot of the default policy.

func (*Client) SetWritePolicy

func (c *Client) SetWritePolicy(p WritePolicy)

SetWritePolicy sets the default write policy for the client. It is safe to call while operations are running; each operation uses a single snapshot of the default policy before applying per-call options.

func (*Client) WarmUp

func (c *Client) WarmUp(ctx context.Context) error

WarmUp pre-establishes connections to all known nodes, filling each connection pool to capacity. This eliminates the TCP dial cost on the first requests after client creation.

type Cluster added in v1.2.0

type Cluster struct {
	// contains filtered or unexported fields
}

Cluster manages the set of known server nodes and a cached partition map. The tend loop periodically fetches the partition map from any reachable node, detects node joins/leaves, and updates the local copy-on-write node list.

func (*Cluster) NodeForAddress added in v1.2.0

func (cl *Cluster) NodeForAddress(address string) *Node

NodeForAddress returns the node with the given address, or nil.

func (*Cluster) Nodes added in v1.2.0

func (cl *Cluster) Nodes() []*Node

Nodes returns a snapshot of the known cluster nodes. The returned slice is a copy; Node values are read-only through their exported methods.

func (*Cluster) PartitionMapSnapshot added in v1.2.0

func (cl *Cluster) PartitionMapSnapshot() *PartitionMapInfo

PartitionMapSnapshot returns the latest cached partition map (may be nil).

func (*Cluster) PartitionMapVersion added in v1.2.0

func (cl *Cluster) PartitionMapVersion() int64

PartitionMapVersion returns the cached partition map version, or 0 when the client has not received a partition map yet.

type CommitPolicy

type CommitPolicy uint8

CommitPolicy controls the replication durability guarantee for write operations.

const (
	// CommitAll waits for local write and replica ACK (default, strongest durability).
	CommitAll CommitPolicy = 0

	// CommitMaster returns after local write, replicates asynchronously.
	// Trades durability for latency: master failure before replication loses the write.
	CommitMaster CommitPolicy = 1
)

type Config

type Config struct {
	Seeds             []string
	TendInterval      time.Duration
	ConnectionTimeout time.Duration
	IdleTimeout       time.Duration
	PoolSizePerNode   int // persistent (base) connections kept warm per node
	MaxConnsPerNode   int // hard upper bound on total concurrent connections per node (0 = same as PoolSizePerNode)
	MaxErrorRate      int
	ErrorRateWindow   time.Duration

	// Multiplexing enables multiplexed connections (T17.4). When true, the
	// client sends requests without waiting for individual responses. Multiple
	// goroutines share a fixed set of connections (no pool semaphore overhead).
	Multiplexing             bool
	MultiplexConnsPerNode    int // max connections per node in multiplexed mode (default 16)
	MultiplexMinConnsPerNode int
}

Config holds the client configuration.

func DefaultConfig

func DefaultConfig(seeds ...string) Config

DefaultConfig returns a Config with sensible defaults for the given seed addresses.

Connection pool uses elastic scaling (inspired by FrogoAI/mq-balancer):

  • PoolSizePerNode (64) persistent connections are kept warm in the pool
  • MaxConnsPerNode (256) is the hard ceiling under burst load
  • Temporary connections beyond PoolSizePerNode are created on demand and auto-close when returned (not kept in pool)

func GetConfigFromEnv added in v1.2.0

func GetConfigFromEnv() (Config, error)

GetConfigFromEnv returns a Config built from FDB_* environment variables.

Unset values use the same defaults as DefaultConfig, with FDB_SEEDS defaulting to localhost:3000 for local development. Invalid set values return an error that names the variable and wraps the parse or validation failure.

type Error

type Error struct {
	Message    string
	ResultCode ResultCode
	Node       string
	InDoubt    bool
	Iteration  int
	// contains filtered or unexported fields
}

Error is a rich error returned by client operations. It carries the result code, target node address, in-doubt flag for writes whose outcome is unknown, and the retry iteration count.

func (*Error) Error

func (e *Error) Error() string

Error implements the error interface.

func (*Error) Is

func (e *Error) Is(target error) bool

Is supports errors.Is by matching on ResultCode.

func (*Error) IsInDoubt

func (e *Error) IsInDoubt() bool

IsInDoubt returns true when a write may or may not have succeeded.

func (*Error) Matches

func (e *Error) Matches(codes ...ResultCode) bool

Matches returns true if the error's ResultCode is any of the given codes.

func (*Error) Unwrap

func (e *Error) Unwrap() error

Unwrap returns the underlying error for errors.Unwrap / errors.Is chains.

type Node added in v1.2.0

type Node struct {
	// contains filtered or unexported fields
}

Node represents a single server node in the cluster. Each node owns a LIFO connection pool for request multiplexing and a circuit breaker that fast-fails requests when the node is unreachable.

func (*Node) Active added in v1.2.0

func (n *Node) Active() bool

Active reports whether the client currently considers the node routable.

func (*Node) Address added in v1.2.0

func (n *Node) Address() string

Address returns the node service address.

type Operation

type Operation = protocol.Operation

Operation represents an operation for OPERATE requests.

It is the public client name for the shared protocol operation wire shape.

func AppendStringOp

func AppendStringOp(binName string, value string) Operation

AppendStringOp creates an APPEND operation on a String bin.

func BloomAddOp

func BloomAddOp(binName string, element []byte) Operation

BloomAddOp creates an operation that adds an element to a counting bloom filter bin. If the bin does not exist, the server creates a default bloom.

func BloomInitOp

func BloomInitOp(binName string, n uint32, p float64) Operation

BloomInitOp creates an operation that initializes a new counting bloom filter with the given capacity and false positive rate. n is the expected number of items; p is the target false positive rate (e.g. 0.01).

func BloomRemoveOp

func BloomRemoveOp(binName string, element []byte) Operation

BloomRemoveOp creates an operation that removes an element from a counting bloom filter bin by decrementing the relevant counters.

func BloomResetOp

func BloomResetOp(binName string) Operation

BloomResetOp creates an operation that clears all counters in a bloom filter bin while preserving the filter configuration.

func BloomTestOp

func BloomTestOp(binName string, element []byte) Operation

BloomTestOp creates an operation that tests whether an element is a member of a bloom filter bin. This is a read-only operation — the bin state is not modified. The result is returned as a bool in the response record's bin map.

func HLLAddOp

func HLLAddOp(binName string, values ...[]byte) Operation

HLLAddOp creates an operation that adds one or more elements to an HLL bin. Multiple values are batched into a single operation, amortizing the deserialization/serialization cost on the server (one FromBytes + one ToBytes regardless of how many elements are added).

If the bin does not exist, the server creates a default HLL (log2m=14).

func HLLAddWithConfigOp

func HLLAddWithConfigOp(binName string, indexBitCount, regwidth uint8, values ...[]byte) Operation

HLLAddWithConfigOp creates an operation that adds elements to an HLL bin, using custom indexBitCount and regwidth on first create. If the bin already exists, the config is ignored and the existing HLL settings are preserved.

Args wire format: [indexBitCount:1B][regwidth:1B][count:4B][len:4B][val]...

Size reference (indexBitCount → bytes):

10 → ~1 KB (3.25% error)
14 → ~16 KB (0.81% error, default)

func HLLCountOp

func HLLCountOp(binName string) Operation

HLLCountOp creates an operation that returns the estimated cardinality of an HLL bin. This is a read-only operation — the bin state is not modified. The result is returned as an int64 in the response record's bin map.

func HLLInitOp

func HLLInitOp(binName string, indexBitCount, regwidth uint8) Operation

HLLInitOp creates an operation that initializes a new HLL bin with the given precision parameters. indexBitCount controls accuracy (typically 4-18, default 14). regwidth controls register width (typically 6).

func HLLIntersectCountOp

func HLLIntersectCountOp(binName string, otherHLL []byte) Operation

HLLIntersectCountOp creates an operation that estimates the intersection cardinality between the target bin's HLL and another HLL (provided as raw serialized bytes). Uses the inclusion-exclusion formula: |A ∩ B| = |A| + |B| - |A ∪ B|. The bin state is not modified.

func HLLUnionCountOp

func HLLUnionCountOp(binName string, hlls ...[]byte) Operation

HLLUnionCountOp creates an operation that unions the target bin's HLL with one or more other HLLs (provided as raw serialized bytes) and returns the combined cardinality. The bin state is not modified.

func HLLUnionOp

func HLLUnionOp(binName string, otherHLL []byte) Operation

HLLUnionOp creates an operation that merges another HLL (provided as raw serialized bytes) into the target bin.

func IncrFloatOp

func IncrFloatOp(binName string, delta float64) Operation

IncrFloatOp creates an INCR operation on a Float bin with the given delta.

func IncrIntOp

func IncrIntOp(binName string, delta int64) Operation

IncrIntOp creates an INCR operation on an Integer bin with the given delta.

func ListAppendItemsOp

func ListAppendItemsOp(binName string, values []any, flags ...uint8) (Operation, error)

ListAppendItemsOp creates an operation that appends multiple values to a list bin using one server-side list operation. The values are serialized as a MessagePack array. Optional flags use the same semantics as ListAppendOp.

func ListAppendOp

func ListAppendOp(binName string, value any, flags ...uint8) (Operation, error)

ListAppendOp creates an operation that appends a value to a list bin. The value is serialized using MessagePack. If the bin does not exist, the server creates a new list. Optional flags control write behavior — pass ListFlagAddUnique|ListFlagNoFail for idempotent appends. With no flags the default is writeFlags=0 (unconditional append, backward compatible).

func ListGetByIndexOp

func ListGetByIndexOp(binName string, index int32) Operation

ListGetByIndexOp creates an operation that returns the element at the given index. Negative index counts from the end. This is a read-only operation.

func ListInsertOp

func ListInsertOp(binName string, index int32, value any) (Operation, error)

ListInsertOp creates an operation that inserts a value at the given index. Negative index counts from the end of the list. Returns an error if the value cannot be serialized.

func ListRemoveByIndexOp

func ListRemoveByIndexOp(binName string, index int32) Operation

ListRemoveByIndexOp creates an operation that removes the element at the given index and returns the removed value. Negative index counts from the end.

func ListRemoveByIndexRangeOp

func ListRemoveByIndexRangeOp(binName string, index, count int32) Operation

ListRemoveByIndexRangeOp creates an operation that removes count elements starting at index and returns the removed values. Negative index counts from the end.

func ListRemoveByValueOp

func ListRemoveByValueOp(binName string, value any, returnType ...uint8) (Operation, error)

ListRemoveByValueOp creates an operation that removes every top-level list element whose MessagePack encoding exactly matches value. The default return type is ListReturnValue for parity with the existing remove helpers. Pass one of ListReturnNone, ListReturnValue, or ListReturnCount to override it.

func ListSizeOp

func ListSizeOp(binName string) Operation

ListSizeOp creates an operation that returns the number of elements in a list bin. This is a read-only operation.

func ReadOp

func ReadOp(binName string) Operation

ReadOp creates a READ operation to retrieve a bin value.

func TDigestAddOp

func TDigestAddOp(binName string, value, weight float64) Operation

TDigestAddOp creates an operation that adds a weighted sample to a t-digest bin. If the bin does not exist, the server creates a default t-digest. weight is converted to a uint64 count on the server (minimum 1).

func TDigestAddWithCompressionOp

func TDigestAddWithCompressionOp(binName string, value, weight float64, compression uint32) Operation

TDigestAddWithCompressionOp creates an operation that adds a weighted sample to a t-digest bin, using custom compression on first create. If the bin already exists, the compression param is ignored.

Args wire format: [compression:4B][value:8B][weight:8B]

Size reference (compression → approx bytes):

25  → ~1 KB (matches HLL at indexBitCount=10)
100 → ~3 KB (default)

func TDigestCDFOp

func TDigestCDFOp(binName string, value float64) Operation

TDigestCDFOp creates an operation that returns the estimated cumulative distribution function value for the given data point. This is a read-only operation. The result is the fraction of values less than or equal to value.

func TDigestCountOp

func TDigestCountOp(binName string) Operation

TDigestCountOp creates an operation that returns the total count of samples in a t-digest bin. This is a read-only operation.

func TDigestMaxOp

func TDigestMaxOp(binName string) Operation

TDigestMaxOp creates an operation that returns the maximum value in a t-digest bin. This is a read-only operation.

func TDigestMergeOp

func TDigestMergeOp(binName string, otherTDigest []byte) Operation

TDigestMergeOp creates an operation that merges another t-digest (provided as raw serialized bytes) into the target bin.

func TDigestMinOp

func TDigestMinOp(binName string) Operation

TDigestMinOp creates an operation that returns the minimum value in a t-digest bin. This is a read-only operation.

func TDigestQuantileOp

func TDigestQuantileOp(binName string, p float64) Operation

TDigestQuantileOp creates an operation that returns the estimated value at the given quantile (0.0–1.0). This is a read-only operation.

func WriteOp

func WriteOp(binName string, value any) (Operation, error)

WriteOp creates a WRITE operation for a bin.

func WriteOpCreateOnly

func WriteOpCreateOnly(binName string, value any) (Operation, error)

WriteOpCreateOnly creates a WRITE operation that only writes if the bin does not already have a value. Used for FirstSeen semantics.

type PartitionMapEntry

type PartitionMapEntry struct {
	MasterAddr  string
	ReplicaAddr string
}

PartitionMapEntry holds master and replica service addresses for a single partition.

type PartitionMapInfo

type PartitionMapInfo struct {
	Version    int64
	NodeCount  int
	RosterHash uint64
	Entries    []PartitionMapEntry
}

PartitionMapInfo represents a decoded partition map from the server.

type Pipeline

type Pipeline struct {
	// contains filtered or unexported fields
}

Pipeline batches multiple requests and sends them on a single TCP connection without waiting for individual responses (request pipelining). Responses are read in order after all requests are written, eliminating per-request round-trip latency.

Requests targeting different nodes are automatically grouped and pipelined per node.

Pipeline is NOT safe for concurrent use.

func (*Pipeline) Delete

func (p *Pipeline) Delete(ns, set, key string, opts ...WriteOption)

Delete queues a DELETE request in the pipeline.

func (*Pipeline) Execute

func (p *Pipeline) Execute(ctx context.Context) ([]PipelineResult, error)

Execute sends all queued requests and reads their responses. Requests targeting different nodes are grouped and pipelined per node. Results are returned in the same order that requests were added.

After Execute returns, the pipeline is empty and may be reused.

func (*Pipeline) Get

func (p *Pipeline) Get(ns, set, key string, binNames ...string)

Get queues a GET request in the pipeline.

func (*Pipeline) Len

func (p *Pipeline) Len() int

Len returns the number of queued requests.

func (*Pipeline) Operate

func (p *Pipeline) Operate(ns, set, key string, ops []Operation, opts ...WriteOption)

Operate queues an OPERATE request in the pipeline.

func (*Pipeline) Put

func (p *Pipeline) Put(ns, set, key string, bins map[string]any, opts ...WriteOption) error

Put queues a PUT request in the pipeline.

type PipelineResult

type PipelineResult struct {
	// Record is populated for GET and OPERATE responses with ResultOK.
	// Nil for PUT and DELETE responses.
	Record *Record
	// Err is non-nil when the request failed (network error or server result code).
	Err error
}

PipelineResult holds the outcome of a single request within a Pipeline.

type Record

type Record struct {
	Generation uint32
	Bins       map[string]any
}

Record represents a record returned by the server.

type RecordExistsAction

type RecordExistsAction uint8

RecordExistsAction determines only the existence precondition for a write.

It is intentionally orthogonal to the T26 write-mode contract: merge/update vs full-record replace must be expressed separately because the existing "replace" action already means "require that the record exists".

const (
	// RecordUpdate allows the write on both missing and existing records
	// (subject to the selected write mode).
	RecordUpdate RecordExistsAction = 0

	// RecordCreateOnly fails with ErrKeyExists if the record already exists.
	RecordCreateOnly RecordExistsAction = 1

	// RecordReplace fails with ErrKeyNotFound if the record does not exist.
	// Despite the name, this is an existence check, not the future T26
	// full-record replace mode.
	RecordReplace RecordExistsAction = 2
)

type RecordWriteMode

type RecordWriteMode uint8

RecordWriteMode controls how PUT-style writes treat bins that are not present in the request payload. This applies to Put and BatchPut only.

const (
	// RecordWriteModeReplace keeps the legacy behavior: supplied bins become the
	// full record value.
	RecordWriteModeReplace RecordWriteMode = 0

	// RecordWriteModeMerge requests merge/update semantics: overlay supplied
	// bins onto the existing record and preserve untouched bins.
	RecordWriteModeMerge RecordWriteMode = 1
)

type ReplicaPolicy

type ReplicaPolicy uint8

ReplicaPolicy determines which node a read operation targets.

const (
	// ReplicaMaster reads from the partition master (default).
	ReplicaMaster ReplicaPolicy = 0

	// ReplicaMasterOrReplica reads from master or replica.
	ReplicaMasterOrReplica ReplicaPolicy = 1
)

type ResultCode

type ResultCode uint8

ResultCode identifies the outcome of a client or server operation.

const (
	// ResultOK indicates the operation completed successfully.
	ResultOK ResultCode = 0x00
	// ResultKeyNotFound indicates the requested key does not exist.
	ResultKeyNotFound ResultCode = 0x02
	// ResultKeyExists indicates a create-only write failed because the key already exists.
	ResultKeyExists ResultCode = 0x05
	// ResultGenerationError indicates an optimistic-lock failure due to generation mismatch.
	ResultGenerationError ResultCode = 0x06
	// ResultPartitionUnavailable indicates the target partition is temporarily unavailable.
	ResultPartitionUnavailable ResultCode = 0x14
	// ResultOverloaded indicates the server rejected the request due to backpressure (T17.6).
	ResultOverloaded ResultCode = 0x18
	// ResultFilteredOut indicates the record was excluded by a filter expression.
	ResultFilteredOut ResultCode = 0x1A
	// ResultInDoubt indicates the write outcome is unknown (may or may not have succeeded).
	ResultInDoubt ResultCode = 0x28
	// ResultServerError indicates an unspecified server-side error.
	ResultServerError ResultCode = 0x50

	// ResultTimeout indicates the operation exceeded its deadline.
	ResultTimeout ResultCode = 0xF0
	// ResultClusterDown indicates no cluster nodes are reachable.
	ResultClusterDown ResultCode = 0xF1
	// ResultUnsupportedType indicates the value type cannot be serialized.
	ResultUnsupportedType ResultCode = 0xF2
	// ResultClosed indicates the client has been closed.
	ResultClosed ResultCode = 0xF3
	// ResultNoSeeds indicates no seed addresses were provided.
	ResultNoSeeds ResultCode = 0xF4
)

type ScanAllCursor

type ScanAllCursor string

ScanAllCursor is an opaque pagination token for distributed scans.

type ScanAllResult

type ScanAllResult struct {
	Records    []Record
	NextCursor ScanAllCursor
}

ScanAllResult holds one page of distributed scan records and the cursor to resume the cluster-wide scan.

type ScanOption

type ScanOption func(*scanConfig)

ScanOption configures a scan operation.

func WithScanFilter

func WithScanFilter(expr []byte) ScanOption

WithScanFilter sets a server-side filter expression for the scan. Records that do not match the filter are skipped. Build expressions using protocol.ExpGreater, protocol.ExpIntBin, etc.

type ScanResult

type ScanResult struct {
	Records    []Record
	NextCursor uint32
}

ScanResult holds the records returned by a scan and a cursor for pagination.

type TTLAction

type TTLAction uint8

TTLAction controls how PUT-style writes treat record expiration metadata. This applies to Put and BatchPut only.

const (
	// TTLActionClear keeps the legacy behavior: TTL 0 means no expiration.
	TTLActionClear TTLAction = 0

	// TTLActionPreserve keeps the existing TTL when the record already exists.
	TTLActionPreserve TTLAction = 1

	// TTLActionSet applies the TTL field as an explicit TTL value.
	TTLActionSet TTLAction = 2
)

type WriteOption

type WriteOption func(*writeOptions)

WriteOption configures a write operation.

func WithClearTTL

func WithClearTTL() WriteOption

WithClearTTL removes expiration from the written record.

func WithCommitMaster

func WithCommitMaster() WriteOption

WithCommitMaster sets the commit policy to return after local write only, replicating asynchronously. This reduces write latency at the cost of durability: if the master fails before replication completes, the write is lost.

func WithCreateOnly

func WithCreateOnly() WriteOption

WithCreateOnly makes the write fail with ErrKeyExists if the record already exists.

func WithGeneration

func WithGeneration(generation uint32) WriteOption

WithGeneration sets the expected generation for optimistic locking. The write fails with ErrGenerationMismatch if the server-side generation differs.

func WithMaxRetries

func WithMaxRetries(n int) WriteOption

WithMaxRetries sets the maximum retry attempts for the write operation.

func WithMergeBins

func WithMergeBins() WriteOption

WithMergeBins requests merge/update semantics for PUT/BatchPut.

func WithPreserveTTL

func WithPreserveTTL() WriteOption

WithPreserveTTL keeps the existing record TTL when the record already exists. New records are created with no expiration.

func WithRecordWriteMode

func WithRecordWriteMode(mode RecordWriteMode) WriteOption

WithRecordWriteMode sets the PUT/BatchPut bin merge policy.

func WithReplace

func WithReplace() WriteOption

WithReplace makes the write fail with ErrKeyNotFound if the record does not exist.

This preserves the existing "require record existence" meaning. T26's explicit full-record replace mode must remain a separate option so this method is not overloaded with two different semantics.

func WithReplaceBins

func WithReplaceBins() WriteOption

WithReplaceBins requests explicit full-record replacement for PUT/BatchPut.

func WithSleepBetweenRetries

func WithSleepBetweenRetries(d time.Duration) WriteOption

WithSleepBetweenRetries sets the pause duration between retry attempts.

func WithSocketTimeout

func WithSocketTimeout(d time.Duration) WriteOption

WithSocketTimeout sets the per-socket I/O deadline for the write operation.

func WithTTL

func WithTTL(seconds uint32) WriteOption

WithTTL sets the time-to-live in seconds.

A non-zero value means "set this TTL". Zero keeps the legacy "clear TTL / no expiration" behavior so existing callers retain their current contract.

func WithTotalTimeout

func WithTotalTimeout(d time.Duration) WriteOption

WithTotalTimeout sets the overall operation deadline for the write operation.

func WithWritePolicy

func WithWritePolicy(p WritePolicy) WriteOption

WithWritePolicy applies all fields from a WritePolicy to the write options.

type WritePolicy

type WritePolicy struct {
	BasePolicy
	Generation         uint32             // expected generation for optimistic locking (0 = ignore)
	TTL                uint32             // TTL in seconds when TTLAction == TTLActionSet
	RecordExistsAction RecordExistsAction // behavior when record already exists
	RecordWriteMode    RecordWriteMode    // Put/BatchPut only: replace (legacy) or merge
	TTLAction          TTLAction          // Put/BatchPut only: clear (legacy), preserve, or set TTL
	CommitPolicy       CommitPolicy       // replication durability: CommitAll (default) or CommitMaster
}

WritePolicy controls behavior of write operations (Put, Delete, Operate).

func DefaultWritePolicy

func DefaultWritePolicy() WritePolicy

DefaultWritePolicy returns a WritePolicy with sensible defaults.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL