Documentation
¶
Overview ¶
Package client provides the FrogoDB smart client library.
The client maintains a local partition map, routes requests directly to the owning node (O(1) lookup), and provides connection pooling, batch fan-out, and a policy system.
Quick start:
c, err := client.New("localhost:3000")
defer c.Close()
err = c.Put(ctx, "ns", "set", "key", map[string]any{"name": "Alice"})
rec, err := c.Get(ctx, "ns", "set", "key")
Index ¶
- Constants
- Variables
- func DigestForKey(set, key string) [20]byte
- func PartitionIDForKey(set, key string) uint16
- type BasePolicy
- type BatchDeleteKey
- type BatchError
- type BatchKey
- type BatchOperate
- type BatchOption
- type BatchPolicy
- type BatchWrite
- type Client
- func (c *Client) BatchDelete(ctx context.Context, keys []BatchDeleteKey, opts ...BatchOption) ([]bool, error)
- func (c *Client) BatchGet(ctx context.Context, keys []BatchKey, opts ...BatchOption) ([]*Record, error)
- func (c *Client) BatchOperate(ctx context.Context, records []BatchOperate, opts ...BatchOption) ([]*Record, error)
- func (c *Client) BatchPut(ctx context.Context, records []BatchWrite, opts ...BatchOption) error
- func (c *Client) Close() error
- func (c *Client) Cluster() *Cluster
- func (c *Client) Count(ctx context.Context, ns, set string) (int64, error)
- func (c *Client) CountAll(ctx context.Context, ns, set string) (int64, error)
- func (c *Client) Delete(ctx context.Context, ns, set, key string, opts ...WriteOption) (bool, error)
- func (c *Client) Get(ctx context.Context, ns, set, key string, binNames ...string) (*Record, error)
- func (c *Client) GetPartitionMap(ctx context.Context) (*PartitionMapInfo, error)
- func (c *Client) LSHDedup(ctx context.Context, ns, group, input string, opts ...WriteOption) (string, error)
- func (c *Client) LSHVector(ctx context.Context, ns, group string, vector []float64, opts ...WriteOption) (string, error)
- func (c *Client) NodeForKey(ns, set, key string, replica bool) string
- func (c *Client) Nodes() []string
- func (c *Client) Operate(ctx context.Context, ns, set, key string, ops []Operation, opts ...WriteOption) (*Record, error)
- func (c *Client) PartitionMapVersion() int64
- func (c *Client) Ping(ctx context.Context) error
- func (c *Client) PingAddr(ctx context.Context, addr string) error
- func (c *Client) Pipeline() *Pipeline
- func (c *Client) Put(ctx context.Context, ns, set, key string, bins map[string]any, ...) error
- func (c *Client) Scan(ctx context.Context, ns, set string, limit int, cursor uint32, ...) (*ScanResult, error)
- func (c *Client) ScanAll(ctx context.Context, ns, set string, limit int, cursor ScanAllCursor, ...) (*ScanAllResult, error)
- func (c *Client) SetBatchPolicy(p BatchPolicy)
- func (c *Client) SetReadPolicy(p BasePolicy)
- func (c *Client) SetWritePolicy(p WritePolicy)
- func (c *Client) WarmUp(ctx context.Context) error
- type Cluster
- type CommitPolicy
- type Config
- type Error
- type Node
- type Operation
- func AppendStringOp(binName string, value string) Operation
- func BloomAddOp(binName string, element []byte) Operation
- func BloomInitOp(binName string, n uint32, p float64) Operation
- func BloomRemoveOp(binName string, element []byte) Operation
- func BloomResetOp(binName string) Operation
- func BloomTestOp(binName string, element []byte) Operation
- func HLLAddOp(binName string, values ...[]byte) Operation
- func HLLAddWithConfigOp(binName string, indexBitCount, regwidth uint8, values ...[]byte) Operation
- func HLLCountOp(binName string) Operation
- func HLLInitOp(binName string, indexBitCount, regwidth uint8) Operation
- func HLLIntersectCountOp(binName string, otherHLL []byte) Operation
- func HLLUnionCountOp(binName string, hlls ...[]byte) Operation
- func HLLUnionOp(binName string, otherHLL []byte) Operation
- func IncrFloatOp(binName string, delta float64) Operation
- func IncrIntOp(binName string, delta int64) Operation
- func ListAppendItemsOp(binName string, values []any, flags ...uint8) (Operation, error)
- func ListAppendOp(binName string, value any, flags ...uint8) (Operation, error)
- func ListGetByIndexOp(binName string, index int32) Operation
- func ListInsertOp(binName string, index int32, value any) (Operation, error)
- func ListRemoveByIndexOp(binName string, index int32) Operation
- func ListRemoveByIndexRangeOp(binName string, index, count int32) Operation
- func ListRemoveByValueOp(binName string, value any, returnType ...uint8) (Operation, error)
- func ListSizeOp(binName string) Operation
- func ReadOp(binName string) Operation
- func TDigestAddOp(binName string, value, weight float64) Operation
- func TDigestAddWithCompressionOp(binName string, value, weight float64, compression uint32) Operation
- func TDigestCDFOp(binName string, value float64) Operation
- func TDigestCountOp(binName string) Operation
- func TDigestMaxOp(binName string) Operation
- func TDigestMergeOp(binName string, otherTDigest []byte) Operation
- func TDigestMinOp(binName string) Operation
- func TDigestQuantileOp(binName string, p float64) Operation
- func WriteOp(binName string, value any) (Operation, error)
- func WriteOpCreateOnly(binName string, value any) (Operation, error)
- type PartitionMapEntry
- type PartitionMapInfo
- type Pipeline
- func (p *Pipeline) Delete(ns, set, key string, opts ...WriteOption)
- func (p *Pipeline) Execute(ctx context.Context) ([]PipelineResult, error)
- func (p *Pipeline) Get(ns, set, key string, binNames ...string)
- func (p *Pipeline) Len() int
- func (p *Pipeline) Operate(ns, set, key string, ops []Operation, opts ...WriteOption)
- func (p *Pipeline) Put(ns, set, key string, bins map[string]any, opts ...WriteOption) error
- type PipelineResult
- type Record
- type RecordExistsAction
- type RecordWriteMode
- type ReplicaPolicy
- type ResultCode
- type ScanAllCursor
- type ScanAllResult
- type ScanOption
- type ScanResult
- type TTLAction
- type WriteOption
- func WithClearTTL() WriteOption
- func WithCommitMaster() WriteOption
- func WithCreateOnly() WriteOption
- func WithGeneration(generation uint32) WriteOption
- func WithMaxRetries(n int) WriteOption
- func WithMergeBins() WriteOption
- func WithPreserveTTL() WriteOption
- func WithRecordWriteMode(mode RecordWriteMode) WriteOption
- func WithReplace() WriteOption
- func WithReplaceBins() WriteOption
- func WithSleepBetweenRetries(d time.Duration) WriteOption
- func WithSocketTimeout(d time.Duration) WriteOption
- func WithTTL(seconds uint32) WriteOption
- func WithTotalTimeout(d time.Duration) WriteOption
- func WithWritePolicy(p WritePolicy) WriteOption
- type WritePolicy
Constants ¶
const ( ListReturnNone uint8 = 0 ListReturnValue uint8 = 1 ListReturnCount uint8 = 2 )
List return types for list remove operations.
const ListFlagAddUnique uint8 = 1 << 0
ListFlagAddUnique rejects the append if the value already exists in the list. Duplicate detection uses exact MessagePack byte equality with no coercion, so values such as "1", 1, and 1.0 are all distinct. Combine with ListFlagNoFail to silently skip duplicates instead of returning an error.
const ListFlagNoFail uint8 = 1 << 1
ListFlagNoFail suppresses the error returned by ListFlagAddUnique when a duplicate is detected. The append is silently skipped and the operation succeeds. Has no effect without ListFlagAddUnique.
Variables ¶
var ( ErrKeyNotFound = &Error{Message: "key not found", ResultCode: ResultKeyNotFound} ErrKeyExists = &Error{Message: "key already exists", ResultCode: ResultKeyExists} ErrGenerationMismatch = &Error{Message: "generation mismatch", ResultCode: ResultGenerationError} ErrServerError = &Error{Message: "server error", ResultCode: ResultServerError} ErrInDoubt = &Error{Message: "in doubt", ResultCode: ResultInDoubt, InDoubt: true} ErrOverloaded = &Error{Message: "server overloaded", ResultCode: ResultOverloaded} ErrFilteredOut = &Error{Message: "filtered out", ResultCode: ResultFilteredOut} ErrTimeout = &Error{Message: "operation timeout", ResultCode: ResultTimeout} ErrClusterDown = &Error{Message: "cluster down", ResultCode: ResultClusterDown} ErrUnsupportedType = &Error{Message: "unsupported value type", ResultCode: ResultUnsupportedType} ErrNoSeeds = &Error{Message: "no seed addresses provided", ResultCode: ResultNoSeeds} ErrClosed = &Error{Message: "client is closed", ResultCode: ResultClosed} )
Sentinel errors.
Functions ¶
func DigestForKey ¶
DigestForKey computes RIPEMD-160 digest for a string key in a set.
func PartitionIDForKey ¶
PartitionIDForKey computes the partition ID for a key without a cluster lookup.
Types ¶
type BasePolicy ¶
type BasePolicy struct {
TotalTimeout time.Duration // overall operation deadline; default 1s
SocketTimeout time.Duration // per-socket I/O deadline; default 30s
MaxRetries int // retry attempts on transient failure; default 2 reads, 0 writes
SleepBetweenRetries time.Duration // pause between retries; default 500ms
ReplicaPolicy ReplicaPolicy // read routing target; default ReplicaMaster
}
BasePolicy contains common timeout and retry settings shared by all operations.
func DefaultReadPolicy ¶
func DefaultReadPolicy() BasePolicy
DefaultReadPolicy returns a BasePolicy tuned for read operations.
func DefaultWriteBasePolicy ¶
func DefaultWriteBasePolicy() BasePolicy
DefaultWriteBasePolicy returns a BasePolicy tuned for write operations.
type BatchDeleteKey ¶
BatchDeleteKey identifies a key for a batch DELETE operation.
type BatchError ¶
BatchError reports per-key errors from a batch operation.
func (*BatchError) Error ¶
func (e *BatchError) Error() string
Error implements the error interface.
type BatchOperate ¶
type BatchOperate struct {
Namespace string
Set string
Key string
Operations []Operation
TTL uint32
Generation uint32
}
BatchOperate describes a key and operations for a batch OPERATE.
type BatchOption ¶
type BatchOption func(*batchOptions)
BatchOption configures a batch operation.
func WithAllowPartialResults ¶
func WithAllowPartialResults() BatchOption
WithAllowPartialResults allows the batch to return partial results on node-level failures instead of failing the entire batch.
func WithBatchPolicy ¶
func WithBatchPolicy(p BatchPolicy) BatchOption
WithBatchPolicy applies all fields from a BatchPolicy to this batch.
func WithConcurrentNodes ¶
func WithConcurrentNodes(n int) BatchOption
WithConcurrentNodes limits the number of nodes contacted in parallel. 0 (default) means unlimited, 1 means sequential.
func WithFilterExpression ¶
func WithFilterExpression(expr []byte) BatchOption
WithFilterExpression sets a server-side filter expression for the batch. Records that do not match the filter are returned with ResultFilteredOut. Build expressions using protocol.ExpGreater, protocol.ExpIntBin, etc.
type BatchPolicy ¶
type BatchPolicy struct {
BasePolicy
ConcurrentNodes int // max parallel node requests; 0 = unlimited
AllowPartialResults bool // return partial results on node failure
}
BatchPolicy controls behavior of batch operations.
func DefaultBatchPolicy ¶
func DefaultBatchPolicy() BatchPolicy
DefaultBatchPolicy returns a BatchPolicy with sensible defaults.
type BatchWrite ¶
type BatchWrite struct {
Namespace string
Set string
Key string
Bins map[string]any
TTL uint32
Generation uint32
RecordExistsAction RecordExistsAction
RecordWriteMode RecordWriteMode
TTLAction TTLAction
}
BatchWrite describes a record to write in a batch PUT operation.
type Client ¶
type Client struct {
// contains filtered or unexported fields
}
Client is the FrogoDB smart client.
It connects to one or more seed nodes, discovers the cluster topology via a background tend loop, and routes each request directly to the node that owns the target partition. Each node maintains a LIFO connection pool for high connection reuse.
func NewWithConfig ¶
NewWithConfig creates a new Client with the given configuration.
func (*Client) BatchDelete ¶
func (c *Client) BatchDelete(ctx context.Context, keys []BatchDeleteKey, opts ...BatchOption) ([]bool, error)
BatchDelete deletes multiple records in parallel, fanning out by owning node. Returns a bool slice in the same order as keys: true if the record existed before deletion, false if not found. On node-level failure the returned error is non-nil; with AllowPartialResults the results slice is still populated for nodes that succeeded.
func (*Client) BatchGet ¶
func (c *Client) BatchGet(ctx context.Context, keys []BatchKey, opts ...BatchOption) ([]*Record, error)
BatchGet retrieves multiple records in parallel, fanning out by owning node. Results are returned in the same order as keys. A nil *Record at position i means the key was not found. On node-level failure the returned error is non-nil; with AllowPartialResults the records slice is still populated for nodes that succeeded.
func (*Client) BatchOperate ¶
func (c *Client) BatchOperate(ctx context.Context, records []BatchOperate, opts ...BatchOption) ([]*Record, error)
BatchOperate executes operations on multiple records in parallel, fanning out by owning node. Results are returned in the same order as records.
func (*Client) BatchPut ¶
func (c *Client) BatchPut(ctx context.Context, records []BatchWrite, opts ...BatchOption) error
BatchPut writes multiple records in parallel, fanning out by owning node. On node-level failure the returned error is non-nil; with AllowPartialResults writes to reachable nodes still succeed.
func (*Client) Count ¶
Count returns the approximate record count for a namespace, optionally filtered by set. Pass an empty set to count all records in the namespace.
func (*Client) CountAll ¶
CountAll returns the approximate logical record count for a namespace/set across all known cluster nodes. It fans out node-local count requests with primary-only filtering enabled so replica copies are not double-counted. When no partition map is available, it falls back to the connected node.
func (*Client) Delete ¶
func (c *Client) Delete(ctx context.Context, ns, set, key string, opts ...WriteOption) (bool, error)
Delete removes a record. Returns true if the record existed before deletion.
func (*Client) Get ¶
Get reads a record. If binNames are specified, only those bins are returned. The client's default ReadPolicy controls timeouts, retries, and replica routing. Use SetReadPolicy to override defaults.
func (*Client) GetPartitionMap ¶
func (c *Client) GetPartitionMap(ctx context.Context) (*PartitionMapInfo, error)
GetPartitionMap requests the full partition map from the server.
func (*Client) LSHDedup ¶
func (c *Client) LSHDedup(ctx context.Context, ns, group, input string, opts ...WriteOption) (string, error)
LSHDedup sends an LSH string deduplication request to the server. The server computes the LSH bucket for the input string and returns the client-visible bucket ID. That ID is the historical lowercase hex value from the first eight bytes of SHA-1 over "<dedup-scope>:<input>"; embedded LSH storage IDs used by the server remain internal. The group parameter selects the dedup scope, with the server treating the query-helper group "lsh_dedup" as namespace-scoped.
func (*Client) LSHVector ¶
func (c *Client) LSHVector( ctx context.Context, ns, group string, vector []float64, opts ...WriteOption, ) (string, error)
LSHVector sends an LSH vector request to the server. The server computes the LSH bucket for the float64 vector and returns the behavioural ID. The group parameter namespaces different LSH indexes. Vector wire format: packed big-endian float64 values.
func (*Client) NodeForKey ¶
NodeForKey returns the address of the node that owns the partition for the given key. Returns "" if no partition map is available.
func (*Client) Nodes ¶ added in v1.2.0
Nodes returns the sorted addresses of currently known cluster nodes.
func (*Client) Operate ¶
func (c *Client) Operate( ctx context.Context, ns, set, key string, ops []Operation, opts ...WriteOption, ) (*Record, error)
Operate executes a list of operations atomically on a single record.
func (*Client) PartitionMapVersion ¶ added in v1.2.0
PartitionMapVersion returns the cached partition map version, or 0 when the client has not received a partition map yet.
func (*Client) PingAddr ¶
PingAddr sends a health-check to a specific node address by opening a temporary connection, sending OpPing, and verifying the result.
func (*Client) Pipeline ¶
Pipeline creates a new Pipeline for batching multiple requests on a single TCP connection without waiting for individual responses. See Pipeline.Execute for details.
func (*Client) Put ¶
func (c *Client) Put(ctx context.Context, ns, set, key string, bins map[string]any, opts ...WriteOption) error
Put writes a record with the given bins. Bin values must be int64, int, float64, string, or []byte.
func (*Client) Scan ¶
func (c *Client) Scan( ctx context.Context, ns, set string, limit int, cursor uint32, opts ...ScanOption, ) (*ScanResult, error)
Scan iterates records in a namespace (optionally filtered by set) up to limit. Pass cursor=0 to start a new scan; use ScanResult.NextCursor to continue. A NextCursor of 0 with fewer records than limit indicates the scan is complete.
func (*Client) ScanAll ¶
func (c *Client) ScanAll( ctx context.Context, ns, set string, limit int, cursor ScanAllCursor, opts ...ScanOption, ) (*ScanAllResult, error)
ScanAll scans records across all known cluster nodes using an opaque cursor that tracks per-node local scan progress. When no partition map is available, it falls back to scanning the connected node only.
func (*Client) SetBatchPolicy ¶
func (c *Client) SetBatchPolicy(p BatchPolicy)
SetBatchPolicy sets the default batch policy for the client. It is safe to call while operations are running; each operation uses a single snapshot of the default policy before applying per-call options.
func (*Client) SetReadPolicy ¶
func (c *Client) SetReadPolicy(p BasePolicy)
SetReadPolicy sets the default read policy for the client. It is safe to call while operations are running; each operation uses a single snapshot of the default policy.
func (*Client) SetWritePolicy ¶
func (c *Client) SetWritePolicy(p WritePolicy)
SetWritePolicy sets the default write policy for the client. It is safe to call while operations are running; each operation uses a single snapshot of the default policy before applying per-call options.
type Cluster ¶ added in v1.2.0
type Cluster struct {
// contains filtered or unexported fields
}
Cluster manages the set of known server nodes and a cached partition map. The tend loop periodically fetches the partition map from any reachable node, detects node joins/leaves, and updates the local copy-on-write node list.
func (*Cluster) NodeForAddress ¶ added in v1.2.0
NodeForAddress returns the node with the given address, or nil.
func (*Cluster) Nodes ¶ added in v1.2.0
Nodes returns a snapshot of the known cluster nodes. The returned slice is a copy; Node values are read-only through their exported methods.
func (*Cluster) PartitionMapSnapshot ¶ added in v1.2.0
func (cl *Cluster) PartitionMapSnapshot() *PartitionMapInfo
PartitionMapSnapshot returns the latest cached partition map (may be nil).
func (*Cluster) PartitionMapVersion ¶ added in v1.2.0
PartitionMapVersion returns the cached partition map version, or 0 when the client has not received a partition map yet.
type CommitPolicy ¶
type CommitPolicy uint8
CommitPolicy controls the replication durability guarantee for write operations.
const ( // CommitAll waits for local write and replica ACK (default, strongest durability). CommitAll CommitPolicy = 0 // CommitMaster returns after local write, replicates asynchronously. // Trades durability for latency: master failure before replication loses the write. CommitMaster CommitPolicy = 1 )
type Config ¶
type Config struct {
Seeds []string
TendInterval time.Duration
ConnectionTimeout time.Duration
IdleTimeout time.Duration
PoolSizePerNode int // persistent (base) connections kept warm per node
MaxConnsPerNode int // hard upper bound on total concurrent connections per node (0 = same as PoolSizePerNode)
MaxErrorRate int
ErrorRateWindow time.Duration
// Multiplexing enables multiplexed connections (T17.4). When true, the
// client sends requests without waiting for individual responses. Multiple
// goroutines share a fixed set of connections (no pool semaphore overhead).
Multiplexing bool
MultiplexConnsPerNode int // max connections per node in multiplexed mode (default 16)
MultiplexMinConnsPerNode int
}
Config holds the client configuration.
func DefaultConfig ¶
DefaultConfig returns a Config with sensible defaults for the given seed addresses.
Connection pool uses elastic scaling (inspired by FrogoAI/mq-balancer):
- PoolSizePerNode (64) persistent connections are kept warm in the pool
- MaxConnsPerNode (256) is the hard ceiling under burst load
- Temporary connections beyond PoolSizePerNode are created on demand and auto-close when returned (not kept in pool)
func GetConfigFromEnv ¶ added in v1.2.0
GetConfigFromEnv returns a Config built from FDB_* environment variables.
Unset values use the same defaults as DefaultConfig, with FDB_SEEDS defaulting to localhost:3000 for local development. Invalid set values return an error that names the variable and wraps the parse or validation failure.
type Error ¶
type Error struct {
Message string
ResultCode ResultCode
Node string
InDoubt bool
Iteration int
// contains filtered or unexported fields
}
Error is a rich error returned by client operations. It carries the result code, target node address, in-doubt flag for writes whose outcome is unknown, and the retry iteration count.
func (*Error) Matches ¶
func (e *Error) Matches(codes ...ResultCode) bool
Matches returns true if the error's ResultCode is any of the given codes.
type Node ¶ added in v1.2.0
type Node struct {
// contains filtered or unexported fields
}
Node represents a single server node in the cluster. Each node owns a LIFO connection pool for request multiplexing and a circuit breaker that fast-fails requests when the node is unreachable.
type Operation ¶
Operation represents an operation for OPERATE requests.
It is the public client name for the shared protocol operation wire shape.
func AppendStringOp ¶
AppendStringOp creates an APPEND operation on a String bin.
func BloomAddOp ¶
BloomAddOp creates an operation that adds an element to a counting bloom filter bin. If the bin does not exist, the server creates a default bloom.
func BloomInitOp ¶
BloomInitOp creates an operation that initializes a new counting bloom filter with the given capacity and false positive rate. n is the expected number of items; p is the target false positive rate (e.g. 0.01).
func BloomRemoveOp ¶
BloomRemoveOp creates an operation that removes an element from a counting bloom filter bin by decrementing the relevant counters.
func BloomResetOp ¶
BloomResetOp creates an operation that clears all counters in a bloom filter bin while preserving the filter configuration.
func BloomTestOp ¶
BloomTestOp creates an operation that tests whether an element is a member of a bloom filter bin. This is a read-only operation — the bin state is not modified. The result is returned as a bool in the response record's bin map.
func HLLAddOp ¶
HLLAddOp creates an operation that adds one or more elements to an HLL bin. Multiple values are batched into a single operation, amortizing the deserialization/serialization cost on the server (one FromBytes + one ToBytes regardless of how many elements are added).
If the bin does not exist, the server creates a default HLL (log2m=14).
func HLLAddWithConfigOp ¶
HLLAddWithConfigOp creates an operation that adds elements to an HLL bin, using custom indexBitCount and regwidth on first create. If the bin already exists, the config is ignored and the existing HLL settings are preserved.
Args wire format: [indexBitCount:1B][regwidth:1B][count:4B][len:4B][val]...
Size reference (indexBitCount → bytes):
10 → ~1 KB (3.25% error) 14 → ~16 KB (0.81% error, default)
func HLLCountOp ¶
HLLCountOp creates an operation that returns the estimated cardinality of an HLL bin. This is a read-only operation — the bin state is not modified. The result is returned as an int64 in the response record's bin map.
func HLLInitOp ¶
HLLInitOp creates an operation that initializes a new HLL bin with the given precision parameters. indexBitCount controls accuracy (typically 4-18, default 14). regwidth controls register width (typically 6).
func HLLIntersectCountOp ¶
HLLIntersectCountOp creates an operation that estimates the intersection cardinality between the target bin's HLL and another HLL (provided as raw serialized bytes). Uses the inclusion-exclusion formula: |A ∩ B| = |A| + |B| - |A ∪ B|. The bin state is not modified.
func HLLUnionCountOp ¶
HLLUnionCountOp creates an operation that unions the target bin's HLL with one or more other HLLs (provided as raw serialized bytes) and returns the combined cardinality. The bin state is not modified.
func HLLUnionOp ¶
HLLUnionOp creates an operation that merges another HLL (provided as raw serialized bytes) into the target bin.
func IncrFloatOp ¶
IncrFloatOp creates an INCR operation on a Float bin with the given delta.
func ListAppendItemsOp ¶
ListAppendItemsOp creates an operation that appends multiple values to a list bin using one server-side list operation. The values are serialized as a MessagePack array. Optional flags use the same semantics as ListAppendOp.
func ListAppendOp ¶
ListAppendOp creates an operation that appends a value to a list bin. The value is serialized using MessagePack. If the bin does not exist, the server creates a new list. Optional flags control write behavior — pass ListFlagAddUnique|ListFlagNoFail for idempotent appends. With no flags the default is writeFlags=0 (unconditional append, backward compatible).
func ListGetByIndexOp ¶
ListGetByIndexOp creates an operation that returns the element at the given index. Negative index counts from the end. This is a read-only operation.
func ListInsertOp ¶
ListInsertOp creates an operation that inserts a value at the given index. Negative index counts from the end of the list. Returns an error if the value cannot be serialized.
func ListRemoveByIndexOp ¶
ListRemoveByIndexOp creates an operation that removes the element at the given index and returns the removed value. Negative index counts from the end.
func ListRemoveByIndexRangeOp ¶
ListRemoveByIndexRangeOp creates an operation that removes count elements starting at index and returns the removed values. Negative index counts from the end.
func ListRemoveByValueOp ¶
ListRemoveByValueOp creates an operation that removes every top-level list element whose MessagePack encoding exactly matches value. The default return type is ListReturnValue for parity with the existing remove helpers. Pass one of ListReturnNone, ListReturnValue, or ListReturnCount to override it.
func ListSizeOp ¶
ListSizeOp creates an operation that returns the number of elements in a list bin. This is a read-only operation.
func TDigestAddOp ¶
TDigestAddOp creates an operation that adds a weighted sample to a t-digest bin. If the bin does not exist, the server creates a default t-digest. weight is converted to a uint64 count on the server (minimum 1).
func TDigestAddWithCompressionOp ¶
func TDigestAddWithCompressionOp(binName string, value, weight float64, compression uint32) Operation
TDigestAddWithCompressionOp creates an operation that adds a weighted sample to a t-digest bin, using custom compression on first create. If the bin already exists, the compression param is ignored.
Args wire format: [compression:4B][value:8B][weight:8B]
Size reference (compression → approx bytes):
25 → ~1 KB (matches HLL at indexBitCount=10) 100 → ~3 KB (default)
func TDigestCDFOp ¶
TDigestCDFOp creates an operation that returns the estimated cumulative distribution function value for the given data point. This is a read-only operation. The result is the fraction of values less than or equal to value.
func TDigestCountOp ¶
TDigestCountOp creates an operation that returns the total count of samples in a t-digest bin. This is a read-only operation.
func TDigestMaxOp ¶
TDigestMaxOp creates an operation that returns the maximum value in a t-digest bin. This is a read-only operation.
func TDigestMergeOp ¶
TDigestMergeOp creates an operation that merges another t-digest (provided as raw serialized bytes) into the target bin.
func TDigestMinOp ¶
TDigestMinOp creates an operation that returns the minimum value in a t-digest bin. This is a read-only operation.
func TDigestQuantileOp ¶
TDigestQuantileOp creates an operation that returns the estimated value at the given quantile (0.0–1.0). This is a read-only operation.
type PartitionMapEntry ¶
PartitionMapEntry holds master and replica service addresses for a single partition.
type PartitionMapInfo ¶
type PartitionMapInfo struct {
Version int64
NodeCount int
RosterHash uint64
Entries []PartitionMapEntry
}
PartitionMapInfo represents a decoded partition map from the server.
type Pipeline ¶
type Pipeline struct {
// contains filtered or unexported fields
}
Pipeline batches multiple requests and sends them on a single TCP connection without waiting for individual responses (request pipelining). Responses are read in order after all requests are written, eliminating per-request round-trip latency.
Requests targeting different nodes are automatically grouped and pipelined per node.
Pipeline is NOT safe for concurrent use.
func (*Pipeline) Delete ¶
func (p *Pipeline) Delete(ns, set, key string, opts ...WriteOption)
Delete queues a DELETE request in the pipeline.
func (*Pipeline) Execute ¶
func (p *Pipeline) Execute(ctx context.Context) ([]PipelineResult, error)
Execute sends all queued requests and reads their responses. Requests targeting different nodes are grouped and pipelined per node. Results are returned in the same order that requests were added.
After Execute returns, the pipeline is empty and may be reused.
type PipelineResult ¶
type PipelineResult struct {
// Record is populated for GET and OPERATE responses with ResultOK.
// Nil for PUT and DELETE responses.
Record *Record
// Err is non-nil when the request failed (network error or server result code).
Err error
}
PipelineResult holds the outcome of a single request within a Pipeline.
type RecordExistsAction ¶
type RecordExistsAction uint8
RecordExistsAction determines only the existence precondition for a write.
It is intentionally orthogonal to the T26 write-mode contract: merge/update vs full-record replace must be expressed separately because the existing "replace" action already means "require that the record exists".
const ( // RecordUpdate allows the write on both missing and existing records // (subject to the selected write mode). RecordUpdate RecordExistsAction = 0 // RecordCreateOnly fails with ErrKeyExists if the record already exists. RecordCreateOnly RecordExistsAction = 1 // RecordReplace fails with ErrKeyNotFound if the record does not exist. // Despite the name, this is an existence check, not the future T26 // full-record replace mode. RecordReplace RecordExistsAction = 2 )
type RecordWriteMode ¶
type RecordWriteMode uint8
RecordWriteMode controls how PUT-style writes treat bins that are not present in the request payload. This applies to Put and BatchPut only.
const ( // RecordWriteModeReplace keeps the legacy behavior: supplied bins become the // full record value. RecordWriteModeReplace RecordWriteMode = 0 // RecordWriteModeMerge requests merge/update semantics: overlay supplied // bins onto the existing record and preserve untouched bins. RecordWriteModeMerge RecordWriteMode = 1 )
type ReplicaPolicy ¶
type ReplicaPolicy uint8
ReplicaPolicy determines which node a read operation targets.
const ( // ReplicaMaster reads from the partition master (default). ReplicaMaster ReplicaPolicy = 0 // ReplicaMasterOrReplica reads from master or replica. ReplicaMasterOrReplica ReplicaPolicy = 1 )
type ResultCode ¶
type ResultCode uint8
ResultCode identifies the outcome of a client or server operation.
const ( // ResultOK indicates the operation completed successfully. ResultOK ResultCode = 0x00 // ResultKeyNotFound indicates the requested key does not exist. ResultKeyNotFound ResultCode = 0x02 // ResultKeyExists indicates a create-only write failed because the key already exists. ResultKeyExists ResultCode = 0x05 // ResultGenerationError indicates an optimistic-lock failure due to generation mismatch. ResultGenerationError ResultCode = 0x06 ResultPartitionUnavailable ResultCode = 0x14 // ResultOverloaded indicates the server rejected the request due to backpressure (T17.6). ResultOverloaded ResultCode = 0x18 // ResultFilteredOut indicates the record was excluded by a filter expression. ResultFilteredOut ResultCode = 0x1A // ResultInDoubt indicates the write outcome is unknown (may or may not have succeeded). ResultInDoubt ResultCode = 0x28 // ResultServerError indicates an unspecified server-side error. ResultServerError ResultCode = 0x50 // ResultTimeout indicates the operation exceeded its deadline. ResultTimeout ResultCode = 0xF0 // ResultClusterDown indicates no cluster nodes are reachable. ResultClusterDown ResultCode = 0xF1 // ResultUnsupportedType indicates the value type cannot be serialized. ResultUnsupportedType ResultCode = 0xF2 // ResultClosed indicates the client has been closed. ResultClosed ResultCode = 0xF3 // ResultNoSeeds indicates no seed addresses were provided. ResultNoSeeds ResultCode = 0xF4 )
type ScanAllCursor ¶
type ScanAllCursor string
ScanAllCursor is an opaque pagination token for distributed scans.
type ScanAllResult ¶
type ScanAllResult struct {
Records []Record
NextCursor ScanAllCursor
}
ScanAllResult holds one page of distributed scan records and the cursor to resume the cluster-wide scan.
type ScanOption ¶
type ScanOption func(*scanConfig)
ScanOption configures a scan operation.
func WithScanFilter ¶
func WithScanFilter(expr []byte) ScanOption
WithScanFilter sets a server-side filter expression for the scan. Records that do not match the filter are skipped. Build expressions using protocol.ExpGreater, protocol.ExpIntBin, etc.
type ScanResult ¶
ScanResult holds the records returned by a scan and a cursor for pagination.
type TTLAction ¶
type TTLAction uint8
TTLAction controls how PUT-style writes treat record expiration metadata. This applies to Put and BatchPut only.
const ( // TTLActionClear keeps the legacy behavior: TTL 0 means no expiration. TTLActionClear TTLAction = 0 // TTLActionPreserve keeps the existing TTL when the record already exists. TTLActionPreserve TTLAction = 1 // TTLActionSet applies the TTL field as an explicit TTL value. TTLActionSet TTLAction = 2 )
type WriteOption ¶
type WriteOption func(*writeOptions)
WriteOption configures a write operation.
func WithClearTTL ¶
func WithClearTTL() WriteOption
WithClearTTL removes expiration from the written record.
func WithCommitMaster ¶
func WithCommitMaster() WriteOption
WithCommitMaster sets the commit policy to return after local write only, replicating asynchronously. This reduces write latency at the cost of durability: if the master fails before replication completes, the write is lost.
func WithCreateOnly ¶
func WithCreateOnly() WriteOption
WithCreateOnly makes the write fail with ErrKeyExists if the record already exists.
func WithGeneration ¶
func WithGeneration(generation uint32) WriteOption
WithGeneration sets the expected generation for optimistic locking. The write fails with ErrGenerationMismatch if the server-side generation differs.
func WithMaxRetries ¶
func WithMaxRetries(n int) WriteOption
WithMaxRetries sets the maximum retry attempts for the write operation.
func WithMergeBins ¶
func WithMergeBins() WriteOption
WithMergeBins requests merge/update semantics for PUT/BatchPut.
func WithPreserveTTL ¶
func WithPreserveTTL() WriteOption
WithPreserveTTL keeps the existing record TTL when the record already exists. New records are created with no expiration.
func WithRecordWriteMode ¶
func WithRecordWriteMode(mode RecordWriteMode) WriteOption
WithRecordWriteMode sets the PUT/BatchPut bin merge policy.
func WithReplace ¶
func WithReplace() WriteOption
WithReplace makes the write fail with ErrKeyNotFound if the record does not exist.
This preserves the existing "require record existence" meaning. T26's explicit full-record replace mode must remain a separate option so this method is not overloaded with two different semantics.
func WithReplaceBins ¶
func WithReplaceBins() WriteOption
WithReplaceBins requests explicit full-record replacement for PUT/BatchPut.
func WithSleepBetweenRetries ¶
func WithSleepBetweenRetries(d time.Duration) WriteOption
WithSleepBetweenRetries sets the pause duration between retry attempts.
func WithSocketTimeout ¶
func WithSocketTimeout(d time.Duration) WriteOption
WithSocketTimeout sets the per-socket I/O deadline for the write operation.
func WithTTL ¶
func WithTTL(seconds uint32) WriteOption
WithTTL sets the time-to-live in seconds.
A non-zero value means "set this TTL". Zero keeps the legacy "clear TTL / no expiration" behavior so existing callers retain their current contract.
func WithTotalTimeout ¶
func WithTotalTimeout(d time.Duration) WriteOption
WithTotalTimeout sets the overall operation deadline for the write operation.
func WithWritePolicy ¶
func WithWritePolicy(p WritePolicy) WriteOption
WithWritePolicy applies all fields from a WritePolicy to the write options.
type WritePolicy ¶
type WritePolicy struct {
BasePolicy
Generation uint32 // expected generation for optimistic locking (0 = ignore)
TTL uint32 // TTL in seconds when TTLAction == TTLActionSet
RecordExistsAction RecordExistsAction // behavior when record already exists
RecordWriteMode RecordWriteMode // Put/BatchPut only: replace (legacy) or merge
TTLAction TTLAction // Put/BatchPut only: clear (legacy), preserve, or set TTL
CommitPolicy CommitPolicy // replication durability: CommitAll (default) or CommitMaster
}
WritePolicy controls behavior of write operations (Put, Delete, Operate).
func DefaultWritePolicy ¶
func DefaultWritePolicy() WritePolicy
DefaultWritePolicy returns a WritePolicy with sensible defaults.
Source Files
¶
- batch.go
- batch_delete_node.go
- batch_get_node.go
- batch_node.go
- batch_operate_node.go
- batch_policy.go
- batch_put_node.go
- bloom.go
- circuit.go
- client.go
- cluster.go
- config.go
- connection.go
- count.go
- encoding.go
- error.go
- errors.go
- hll.go
- list.go
- lsh.go
- multiplex.go
- node.go
- partition.go
- partition_hash.go
- partition_map_info.go
- pipeline.go
- policy.go
- pool.go
- routing.go
- scan.go
- scan_distributed.go
- tdigest.go
- transport.go
- validation.go
- wire.go
- write_flags.go
- write_policy.go