Documentation
¶
Overview ¶
Package backend provides interfaces and types for implementing cache backends. It defines the contract that all cache backends must follow, including operations for storing, retrieving, and managing cached items. The package supports generic backend implementations with type constraints to ensure type safety.
The main interface IBackend provides methods for:
- Getting and setting cache items
- Managing cache capacity and item count
- Removing items and clearing the cache
- Listing items with optional filters
Backend implementations must satisfy the IBackendConstrain type constraint, which currently supports InMemory and Redis backend types.
Index ¶
- Variables
- func ApplyOptions[T IBackendConstrain](backend *T, options ...Option[T])
- type Chaos
- type ConsistencyLevel
- type DistHTTPAuth
- type DistHTTPLimits
- type DistHTTPTransport
- func NewDistHTTPTransport(timeout time.Duration, resolver func(string) (string, bool)) *DistHTTPTransport
- func NewDistHTTPTransportWithAuth(limits DistHTTPLimits, auth DistHTTPAuth, resolver func(string) (string, bool)) *DistHTTPTransport
- func NewDistHTTPTransportWithLimits(limits DistHTTPLimits, resolver func(string) (string, bool)) *DistHTTPTransport
- func (t *DistHTTPTransport) FetchMerkle(ctx context.Context, nodeID string) (*MerkleTree, error)
- func (t *DistHTTPTransport) ForwardGet(ctx context.Context, nodeID, key string) (*cache.Item, bool, error)
- func (t *DistHTTPTransport) ForwardRemove(ctx context.Context, nodeID, key string, replicate bool) error
- func (t *DistHTTPTransport) ForwardSet(ctx context.Context, nodeID string, item *cache.Item, replicate bool) error
- func (t *DistHTTPTransport) Gossip(ctx context.Context, targetNodeID string, members []GossipMember) error
- func (t *DistHTTPTransport) Health(ctx context.Context, nodeID string) error
- func (t *DistHTTPTransport) IndirectHealth(ctx context.Context, relayNodeID, targetNodeID string) error
- func (t *DistHTTPTransport) ListKeys(ctx context.Context, nodeID, pattern string) ([]string, error)
- type DistMemory
- func (dm *DistMemory) AddPeer(address string)
- func (dm *DistMemory) BuildMerkleTree() *MerkleTree
- func (dm *DistMemory) Capacity() int
- func (dm *DistMemory) Clear(ctx context.Context) error
- func (dm *DistMemory) Count(_ context.Context) int
- func (dm *DistMemory) DebugDropLocal(key string)
- func (dm *DistMemory) DebugInject(it *cache.Item)
- func (dm *DistMemory) DebugOwners(key string) []cluster.NodeID
- func (dm *DistMemory) DistMembershipSnapshot() map[string]any
- func (dm *DistMemory) Drain(_ context.Context) error
- func (dm *DistMemory) EventBus() *eventbus.Bus
- func (dm *DistMemory) Get(ctx context.Context, key string) (*cache.Item, bool)
- func (dm *DistMemory) IsDraining() bool
- func (dm *DistMemory) IsOwner(key string) bool
- func (dm *DistMemory) LatencyHistograms() map[string][]uint64
- func (dm *DistMemory) LifecycleContext() context.Context
- func (dm *DistMemory) List(_ context.Context, _ ...IFilter) ([]*cache.Item, error)
- func (dm *DistMemory) ListKeys(ctx context.Context, pattern string, maxResults int) (ListKeysResult, error)
- func (dm *DistMemory) LocalContains(key string) bool
- func (dm *DistMemory) LocalNodeAddr() string
- func (dm *DistMemory) LocalNodeID() cluster.NodeID
- func (dm *DistMemory) Membership() *cluster.Membership
- func (dm *DistMemory) Metrics() DistMetrics
- func (dm *DistMemory) Remove(ctx context.Context, keys ...string) error
- func (dm *DistMemory) RemovePeer(address string)
- func (dm *DistMemory) Ring() *cluster.Ring
- func (dm *DistMemory) Set(ctx context.Context, item *cache.Item) error
- func (dm *DistMemory) SetCapacity(capacity int)
- func (dm *DistMemory) SetLocalNode(node *cluster.Node)
- func (dm *DistMemory) SetTransport(t DistTransport)
- func (dm *DistMemory) Stop(ctx context.Context) error
- func (dm *DistMemory) SyncWith(ctx context.Context, nodeID string) error
- func (dm *DistMemory) Touch(_ context.Context, key string) bool
- type DistMemoryOption
- func WithDistCapacity(capacity int) DistMemoryOption
- func WithDistChaos(c *Chaos) DistMemoryOption
- func WithDistGossipInterval(d time.Duration) DistMemoryOption
- func WithDistHTTPAuth(auth DistHTTPAuth) DistMemoryOption
- func WithDistHTTPLimits(limits DistHTTPLimits) DistMemoryOption
- func WithDistHeartbeat(interval, suspectAfter, deadAfter time.Duration) DistMemoryOption
- func WithDistHeartbeatSample(k int) DistMemoryOption
- func WithDistHintMaxBytes(b int64) DistMemoryOption
- func WithDistHintMaxPerNode(n int) DistMemoryOption
- func WithDistHintMaxTotal(n int) DistMemoryOption
- func WithDistHintReplayInterval(d time.Duration) DistMemoryOption
- func WithDistHintTTL(d time.Duration) DistMemoryOption
- func WithDistIndirectProbes(k int, timeout time.Duration) DistMemoryOption
- func WithDistListKeysCap(n int) DistMemoryOption
- func WithDistLogger(logger *slog.Logger) DistMemoryOption
- func WithDistMembership(m *cluster.Membership, node *cluster.Node) DistMemoryOption
- func WithDistMerkleAdaptiveBackoff(maxFactor int) DistMemoryOption
- func WithDistMerkleAutoSync(interval time.Duration) DistMemoryOption
- func WithDistMerkleAutoSyncPeers(n int) DistMemoryOption
- func WithDistMerkleChunkSize(n int) DistMemoryOption
- func WithDistMeterProvider(mp metric.MeterProvider) DistMemoryOption
- func WithDistNode(id, address string) DistMemoryOption
- func WithDistParallelReads(enable bool) DistMemoryOption
- func WithDistReadConsistency(l ConsistencyLevel) DistMemoryOption
- func WithDistReadRepairBatch(interval time.Duration, maxBatchSize int) DistMemoryOption
- func WithDistRebalanceBatchSize(n int) DistMemoryOption
- func WithDistRebalanceInterval(d time.Duration) DistMemoryOption
- func WithDistRebalanceMaxConcurrent(n int) DistMemoryOption
- func WithDistRemovalGrace(d time.Duration) DistMemoryOption
- func WithDistReplicaDiffMaxPerTick(n int) DistMemoryOption
- func WithDistReplication(n int) DistMemoryOption
- func WithDistSeeds(addresses []string) DistMemoryOption
- func WithDistShardCount(n int) DistMemoryOption
- func WithDistTombstoneSweep(interval time.Duration) DistMemoryOption
- func WithDistTombstoneTTL(d time.Duration) DistMemoryOption
- func WithDistTracerProvider(tp trace.TracerProvider) DistMemoryOption
- func WithDistTransport(t DistTransport) DistMemoryOption
- func WithDistVirtualNodes(n int) DistMemoryOption
- func WithDistWriteConsistency(l ConsistencyLevel) DistMemoryOption
- type DistMetrics
- type DistTransport
- type GossipMember
- type IBackend
- func NewDistMemory(ctx context.Context, opts ...DistMemoryOption) (IBackend[DistMemory], error)
- func NewDistMemoryWithConfig(ctx context.Context, cfg any, opts ...DistMemoryOption) (IBackend[DistMemory], error)
- func NewInMemory(opts ...Option[InMemory]) (IBackend[InMemory], error)
- func NewRedis(redisOptions ...Option[Redis]) (IBackend[Redis], error)
- func NewRedisCluster(redisOptions ...Option[RedisCluster]) (IBackend[RedisCluster], error)
- type IBackendConstrain
- type IFilter
- type InMemory
- func (cacheBackend *InMemory) Capacity() int
- func (cacheBackend *InMemory) Clear(ctx context.Context) error
- func (cacheBackend *InMemory) Count(_ context.Context) int
- func (cacheBackend *InMemory) Get(_ context.Context, key string) (*cache.Item, bool)
- func (cacheBackend *InMemory) List(_ context.Context, filters ...IFilter) ([]*cache.Item, error)
- func (cacheBackend *InMemory) Remove(ctx context.Context, keys ...string) error
- func (cacheBackend *InMemory) Set(_ context.Context, item *cache.Item) error
- func (cacheBackend *InMemory) SetCapacity(capacity int)
- func (cacheBackend *InMemory) Touch(_ context.Context, key string) bool
- type InProcessTransport
- func (t *InProcessTransport) FetchMerkle(_ context.Context, nodeID string) (*MerkleTree, error)
- func (t *InProcessTransport) ForwardGet(_ context.Context, nodeID, key string) (*cache.Item, bool, error)
- func (t *InProcessTransport) ForwardRemove(ctx context.Context, nodeID, key string, replicate bool) error
- func (t *InProcessTransport) ForwardSet(ctx context.Context, nodeID string, item *cache.Item, replicate bool) error
- func (t *InProcessTransport) Gossip(_ context.Context, targetNodeID string, members []GossipMember) error
- func (t *InProcessTransport) Health(_ context.Context, nodeID string) error
- func (t *InProcessTransport) IndirectHealth(ctx context.Context, relayNodeID, targetNodeID string) error
- func (t *InProcessTransport) ListKeys(_ context.Context, nodeID, pattern string) ([]string, error)
- func (t *InProcessTransport) Register(b *DistMemory)
- func (t *InProcessTransport) Unregister(id string)
- type ListKeysResult
- type MerkleTree
- type Option
- func WithCapacity[T IBackendConstrain](capacity int) Option[T]
- func WithClusterKeysSetName[T RedisCluster](keysSetName string) Option[RedisCluster]
- func WithClusterSerializer[T RedisCluster](ser serializer.ISerializer) Option[RedisCluster]
- func WithKeysSetName[T Redis](keysSetName string) Option[Redis]
- func WithRedisClient[T Redis](client *redis.Client) Option[Redis]
- func WithRedisClusterClient[T RedisCluster](client *redis.ClusterClient) Option[RedisCluster]
- func WithSerializer[T Redis](backendSerializer serializer.ISerializer) Option[Redis]
- type Redis
- func (cacheBackend *Redis) Capacity() int
- func (cacheBackend *Redis) Clear(ctx context.Context) error
- func (cacheBackend *Redis) Count(ctx context.Context) int
- func (cacheBackend *Redis) Get(ctx context.Context, key string) (*cache.Item, bool)
- func (cacheBackend *Redis) List(ctx context.Context, filters ...IFilter) ([]*cache.Item, error)
- func (cacheBackend *Redis) Remove(ctx context.Context, keys ...string) error
- func (cacheBackend *Redis) Set(ctx context.Context, item *cache.Item) error
- func (cacheBackend *Redis) SetCapacity(capacity int)
- type RedisCluster
- func (cacheBackend *RedisCluster) Capacity() int
- func (cacheBackend *RedisCluster) Clear(ctx context.Context) error
- func (cacheBackend *RedisCluster) Count(ctx context.Context) int
- func (cacheBackend *RedisCluster) Get(ctx context.Context, key string) (*cache.Item, bool)
- func (cacheBackend *RedisCluster) List(ctx context.Context, filters ...IFilter) ([]*cache.Item, error)
- func (cacheBackend *RedisCluster) Remove(ctx context.Context, keys ...string) error
- func (cacheBackend *RedisCluster) Set(ctx context.Context, item *cache.Item) error
- func (cacheBackend *RedisCluster) SetCapacity(capacity int)
- type SortOrderFilter
Constants ¶
This section is empty.
Variables ¶
var ErrChaosDrop = ewrap.New("dist transport: chaos drop")
ErrChaosDrop is the sentinel a chaos-wrapped transport returns when a configured DropRate triggers on a request. Tests use errors.Is(err, ErrChaosDrop) to confirm the fault path fired rather than a real transport failure.
Functions ¶
func ApplyOptions ¶
func ApplyOptions[T IBackendConstrain](backend *T, options ...Option[T])
ApplyOptions applies the given options to the given backend.
Types ¶
type Chaos ¶ added in v0.8.4
type Chaos struct {
// contains filtered or unexported fields
}
Chaos configures fault injection for resilience testing of the dist transport. Construct via NewChaos, configure via the SetDropRate / SetLatency helpers (atomic — safe to call from running tests), then pass to WithDistChaos when constructing the DistMemory under test.
Disabled by default (zero values). When set on a DistMemory, every dist transport call goes through the chaos wrapper; the overhead is two atomic loads on the no-fault path so the cost stays trivial even when chaos is enabled but inactive.
Chaos is for tests only. The hooks DO NOT belong in production — there's no safety mechanism preventing an operator from pointing a production cluster at a Chaos with DropRate=1.0. Don't.
func NewChaos ¶ added in v0.8.4
func NewChaos() *Chaos
NewChaos returns a zeroed Chaos. Configure via SetDropRate / SetLatency before or after passing to WithDistChaos; mutation is atomic and safe to interleave with running tests.
func (*Chaos) Drops ¶ added in v0.8.4
Drops returns the count of transport calls dropped since construction. Useful for test assertions of the shape "assert chaos.Drops() > 0 after running the load".
func (*Chaos) Latencies ¶ added in v0.8.4
Latencies returns the count of transport calls that had latency injected since construction.
func (*Chaos) SetDropRate ¶ added in v0.8.4
SetDropRate configures the probability (0.0..1.0) that a single transport call is dropped instead of forwarded. Pass 0 to disable. Values outside [0, 1] are clamped.
type ConsistencyLevel ¶ added in v0.2.0
type ConsistencyLevel int
ConsistencyLevel defines read/write consistency semantics.
const ( // ConsistencyOne returns after a single owner success (fast, may be stale). ConsistencyOne ConsistencyLevel = iota // ConsistencyQuorum waits for majority (floor(n/2)+1). ConsistencyQuorum // ConsistencyAll waits for all owners. ConsistencyAll )
func (ConsistencyLevel) String ¶ added in v0.6.0
func (l ConsistencyLevel) String() string
String returns the human-readable form for logs and span attributes. Unknown values render as `consistency(<int>)` rather than panicking so a corrupted/forwards-compatible value still produces useful telemetry.
type DistHTTPAuth ¶ added in v0.4.2
type DistHTTPAuth struct {
// Token is the shared bearer string. When set, the server requires
// `Authorization: Bearer <token>` on every request (unless
// ServerVerify overrides) and the auto-created client sends the
// same header (unless ClientSign overrides).
Token string
// ServerVerify (optional) inspects each incoming request and returns
// non-nil to reject with HTTP 401. Use for JWT, OAuth introspection,
// path-based exemptions, etc. When set it replaces the Token check.
ServerVerify func(fiber.Ctx) error
// ClientSign (optional) decorates each outgoing request before send.
// Use for HMAC signing, mTLS-derived headers, etc. When set it
// replaces the default `Authorization: Bearer <token>` header.
ClientSign func(*http.Request) error
// AllowAnonymousInbound permits this node to accept inbound requests
// without authentication when no inbound verifier is configured
// (neither Token nor ServerVerify) but ClientSign is. Without this
// flag, that combination is rejected at construction time to prevent
// silent inbound bypass when an operator wires only one side of an
// HMAC scheme. Setting this flag is an explicit acknowledgment that
// inbound traffic is protected at a layer below this server (L4
// firewall, service mesh mTLS, etc.).
AllowAnonymousInbound bool
}
DistHTTPAuth configures authentication for the dist HTTP server (inbound) and the auto-created HTTP client (outbound). The two sides are independent: ServerVerify+Token govern inbound validation, while ClientSign+Token govern outbound signing. Zero-value disables both.
Symmetric clusters use Token alone: every node sets the same string, the server validates incoming `Authorization: Bearer <token>` via constant-time compare, and the client sends the same header.
ServerVerify (inbound) and ClientSign (outbound) are escape hatches for JWT, mTLS-derived identity, HMAC signing, etc. When set, each fully replaces the corresponding Token-based default on its side.
Asymmetric configs are valid but require explicit intent. In particular, setting ClientSign without any inbound verifier (Token or ServerVerify) is dangerous — the node would sign outbound traffic while accepting unauthenticated inbound. NewDistMemory rejects that shape with sentinel.ErrInsecureAuthConfig. Operators who genuinely want signed-out / open-in (e.g. inbound is gated by an L4 firewall or service mesh) must opt in via AllowAnonymousInbound.
type DistHTTPLimits ¶ added in v0.4.2
type DistHTTPLimits struct {
// BodyLimit caps inbound request body bytes (server-side).
BodyLimit int
// ResponseLimit caps inbound response body bytes (client-side).
ResponseLimit int64
// ReadTimeout is the server read deadline.
ReadTimeout time.Duration
// WriteTimeout is the server write deadline.
WriteTimeout time.Duration
// IdleTimeout is the keep-alive idle timeout (server-side).
IdleTimeout time.Duration
// Concurrency is the maximum number of concurrent in-flight handlers.
Concurrency int
// ClientTimeout is the per-request deadline for the dist HTTP client.
ClientTimeout time.Duration
// TLSConfig (when non-nil) enables TLS for both the dist HTTP server
// (wraps the TCP listener with tls.NewListener) and the auto-created
// HTTP client (sets Transport.TLSClientConfig). Operators must apply
// the same config to every node; mismatched roots/certs cause peer
// handshakes to fail. The same struct is shared by server and client
// because in this codebase a node is both — but tests / advanced
// callers can fork the value and assign different ones if needed.
//
// For mTLS, set both Certificates (server cert) and ClientCAs +
// ClientAuth=tls.RequireAndVerifyClientCert. The auto-client uses
// the same cert as its client cert via Certificates[0].
TLSConfig *tls.Config
// CompressionThreshold opts the dist HTTP transport into gzip
// compression of Set request bodies whose serialized payload size
// exceeds this many bytes. The client sets `Content-Encoding:
// gzip` and the server transparently decompresses before
// unmarshaling. 0 disables compression — matches the pre-Phase-B
// wire format byte-for-byte. Operators on bandwidth-constrained
// links with large values (>1 KiB) typically see meaningful
// reductions; values smaller than the threshold pay no cost.
//
// Server compatibility: a server with compression disabled will
// reject a gzip-encoded body with HTTP 400. Roll out the threshold
// to all peers before raising it on any peer.
CompressionThreshold int
}
DistHTTPLimits bundles the tunable HTTP-transport limits applied to both the dist HTTP server (inbound request bodies, timeouts, concurrency) and the auto-created dist HTTP client (outbound request timeout, inbound response size). Zero-valued fields fall back to the defaults below.
Use WithDistHTTPLimits to override defaults; partial overrides keep the rest at their default values.
type DistHTTPTransport ¶ added in v0.2.0
type DistHTTPTransport struct {
// contains filtered or unexported fields
}
DistHTTPTransport implements DistTransport over HTTP JSON.
func NewDistHTTPTransport ¶ added in v0.2.0
func NewDistHTTPTransport(timeout time.Duration, resolver func(string) (string, bool)) *DistHTTPTransport
NewDistHTTPTransport constructs a DistHTTPTransport with the given timeout and nodeID->baseURL resolver. Timeout <=0 defaults to defaultDistHTTPClientTimeout. Response bodies are bounded by defaultDistHTTPResponseLimit; use NewDistHTTPTransportWithLimits to override.
func NewDistHTTPTransportWithAuth ¶ added in v0.4.2
func NewDistHTTPTransportWithAuth(limits DistHTTPLimits, auth DistHTTPAuth, resolver func(string) (string, bool)) *DistHTTPTransport
NewDistHTTPTransportWithAuth combines explicit limits and auth policy in a single constructor. DistMemory uses this when WithDistHTTPAuth is set so the auto-created HTTP client signs requests with the same token the server validates against.
If limits.TLSConfig is non-nil, the underlying http.Transport is configured with the same *tls.Config used by the server, so client connections to peer https:// endpoints handshake against the same roots and certificates.
func NewDistHTTPTransportWithLimits ¶ added in v0.4.2
func NewDistHTTPTransportWithLimits(limits DistHTTPLimits, resolver func(string) (string, bool)) *DistHTTPTransport
NewDistHTTPTransportWithLimits is the explicit-limits variant. Use it when the caller needs to raise/lower the response-body cap or align the client timeout with custom DistHTTPLimits applied to the server.
func (*DistHTTPTransport) FetchMerkle ¶ added in v0.2.0
func (t *DistHTTPTransport) FetchMerkle(ctx context.Context, nodeID string) (*MerkleTree, error)
FetchMerkle retrieves a Merkle tree snapshot from a remote node.
func (*DistHTTPTransport) ForwardGet ¶ added in v0.2.0
func (t *DistHTTPTransport) ForwardGet(ctx context.Context, nodeID, key string) (*cache.Item, bool, error)
ForwardGet fetches a single item from a remote node.
func (*DistHTTPTransport) ForwardRemove ¶ added in v0.2.0
func (t *DistHTTPTransport) ForwardRemove(ctx context.Context, nodeID, key string, replicate bool) error
ForwardRemove propagates a delete operation to a remote node.
func (*DistHTTPTransport) ForwardSet ¶ added in v0.2.0
func (t *DistHTTPTransport) ForwardSet(ctx context.Context, nodeID string, item *cache.Item, replicate bool) error
ForwardSet sends a Set/Replicate request to a remote node.
func (*DistHTTPTransport) Gossip ¶ added in v0.6.5
func (t *DistHTTPTransport) Gossip(ctx context.Context, targetNodeID string, members []GossipMember) error
Gossip pushes a member-list snapshot to the target's `/internal/gossip` endpoint. The receiver merges via higher-incarnation-wins and may self-refute if the snapshot claims it's suspect — see acceptGossip + refuteIfSuspected.
The body is a JSON array of GossipMember; the wire shape is stable (separate type from cluster.Node) so the cluster package can add internal fields without breaking peers running older binaries.
func (*DistHTTPTransport) Health ¶ added in v0.2.0
func (t *DistHTTPTransport) Health(ctx context.Context, nodeID string) error
Health performs a health probe against a remote node.
func (*DistHTTPTransport) IndirectHealth ¶ added in v0.6.0
func (t *DistHTTPTransport) IndirectHealth(ctx context.Context, relayNodeID, targetNodeID string) error
IndirectHealth asks the relay node to probe the target on this caller's behalf. The dist HTTP server's `/internal/probe?target=<id>` endpoint runs a Health() call on its own transport and returns 200 if the target is reachable from the relay's vantage point. Used by the SWIM indirect-probe path to filter caller-side network blips before marking a peer suspect.
func (*DistHTTPTransport) ListKeys ¶ added in v0.2.0
ListKeys returns keys held on a remote node, optionally filtered by `pattern` (forwarded server-side as the `q` query parameter on `/internal/keys`). An empty pattern returns the full key set (expensive; used for tests / anti-entropy fallback). Patterns containing glob metacharacters (* ? [) are matched server-side via path.Match; non-glob patterns are treated as prefix.
Walks the cursor-paginated `/internal/keys` endpoint to assemble the full result. Callers that need to bound the result-set size should layer their own cap on top.
type DistMemory ¶ added in v0.2.0
type DistMemory struct {
// contains filtered or unexported fields
}
DistMemory is a sharded in-process distributed-like backend. It simulates distribution by consistent hashing across a fixed set of in-memory shards. It is intended for single-process multi-shard experimentation; NOT cross-process.
func (*DistMemory) AddPeer ¶ added in v0.2.2
func (dm *DistMemory) AddPeer(address string)
AddPeer adds a peer address into local membership (best-effort, no network validation). If the peer already exists (by address) it's ignored. Used by tests to simulate join propagation.
func (*DistMemory) BuildMerkleTree ¶ added in v0.2.0
func (dm *DistMemory) BuildMerkleTree() *MerkleTree
BuildMerkleTree constructs a Merkle tree snapshot of local data (best-effort, locks each shard briefly).
func (*DistMemory) Capacity ¶ added in v0.2.0
func (dm *DistMemory) Capacity() int
Capacity returns logical capacity.
func (*DistMemory) Clear ¶ added in v0.2.0
func (dm *DistMemory) Clear(ctx context.Context) error
Clear wipes all shards.
func (*DistMemory) Count ¶ added in v0.2.0
func (dm *DistMemory) Count(_ context.Context) int
Count returns total items across shards.
func (*DistMemory) DebugDropLocal ¶ added in v0.2.0
func (dm *DistMemory) DebugDropLocal(key string)
DebugDropLocal removes a key only from the local shard (for tests / read-repair validation).
func (*DistMemory) DebugInject ¶ added in v0.2.0
func (dm *DistMemory) DebugInject(it *cache.Item)
DebugInject stores an item directly into the local shard (no replication / ownership checks) for tests.
func (*DistMemory) DebugOwners ¶ added in v0.2.0
func (dm *DistMemory) DebugOwners(key string) []cluster.NodeID
DebugOwners returns current owners slice for a key (for tests).
func (*DistMemory) DistMembershipSnapshot ¶ added in v0.2.1
func (dm *DistMemory) DistMembershipSnapshot() map[string]any
DistMembershipSnapshot returns lightweight membership view (states & version).
func (*DistMemory) Drain ¶ added in v0.6.0
func (dm *DistMemory) Drain(_ context.Context) error
Drain marks this node for graceful shutdown: future Set/Remove return sentinel.ErrDraining, /health reports HTTP 503 so external load balancers stop routing traffic, and the operator should follow up with Stop after Drain has settled. Get continues to serve so in-flight reads complete with consistent data.
Drain is one-way and idempotent — the second call is a no-op (returns nil). Operators clear it by restarting the process.
Returns nil today; the signature retains an error so future versions can wait for active replication fan-out to flush before returning (Phase B's hint queue makes that meaningful) without a breaking change.
func (*DistMemory) EventBus ¶ added in v0.7.1
func (dm *DistMemory) EventBus() *eventbus.Bus
EventBus returns the in-process broadcaster for topology events. The management HTTP server's SSE handler subscribes here. Returns nil only on the (test-only) path where DistMemory was constructed without going through NewDistMemory.
func (*DistMemory) IsDraining ¶ added in v0.6.0
func (dm *DistMemory) IsDraining() bool
IsDraining reports whether Drain has been called on this node. Operator helper for dashboards / readiness probes that want to surface drain state independently of the dist HTTP endpoint.
func (*DistMemory) IsOwner ¶ added in v0.2.2
func (dm *DistMemory) IsOwner(key string) bool
IsOwner reports whether this node is an owner (primary or replica) for key. Exported for tests / external observability (thin wrapper over internal logic).
func (*DistMemory) LatencyHistograms ¶ added in v0.2.1
func (dm *DistMemory) LatencyHistograms() map[string][]uint64
LatencyHistograms returns a snapshot of latency bucket counts per operation (ns buckets; last bucket +Inf).
func (*DistMemory) LifecycleContext ¶ added in v0.4.2
func (dm *DistMemory) LifecycleContext() context.Context
LifecycleContext returns the server-lifecycle context derived from the ctx supplied to NewDistMemory. Stop cancels this context, so callers (including HTTP handlers and background loops) can observe shutdown without polling the various stopCh channels. Read-only — modifying the returned ctx has no effect.
func (*DistMemory) List ¶ added in v0.2.0
List aggregates items (no ordering, then filters applied per interface contract not yet integrated; kept simple);.
func (*DistMemory) ListKeys ¶ added in v0.8.7
func (dm *DistMemory) ListKeys(ctx context.Context, pattern string, maxResults int) (ListKeysResult, error)
ListKeys enumerates keys across every alive peer (including this node), deduplicates by string identity, sorts, and returns up to `max` results. `pattern` follows the same prefix/glob rules as `buildKeyMatcher`. A `max` of 0 falls back to defaultListKeysMax; the hard ceiling lives in the v1 handler that calls this method.
Per-peer failures (transport error, unreachable, etc.) don't fail the whole call — best-effort matches the read-repair and hint-replay contracts elsewhere in the cluster. Failed peers are returned in PartialNodes so the caller can surface a banner to the operator.
func (*DistMemory) LocalContains ¶ added in v0.2.0
func (dm *DistMemory) LocalContains(key string) bool
LocalContains returns true if key exists in local shard (ignores ownership).
func (*DistMemory) LocalNodeAddr ¶ added in v0.2.0
func (dm *DistMemory) LocalNodeAddr() string
LocalNodeAddr returns the configured node address (host:port) used by HTTP server.
func (*DistMemory) LocalNodeID ¶ added in v0.2.0
func (dm *DistMemory) LocalNodeID() cluster.NodeID
LocalNodeID returns this instance's node ID (testing helper).
func (*DistMemory) Membership ¶ added in v0.2.0
func (dm *DistMemory) Membership() *cluster.Membership
Membership returns current membership reference (read-only usage).
func (*DistMemory) Metrics ¶ added in v0.2.0
func (dm *DistMemory) Metrics() DistMetrics
Metrics returns a snapshot of distributed metrics. Metrics returns a snapshot of distributed metrics.
func (*DistMemory) Remove ¶ added in v0.2.0
func (dm *DistMemory) Remove(ctx context.Context, keys ...string) error
Remove deletes keys.
func (*DistMemory) RemovePeer ¶ added in v0.2.2
func (dm *DistMemory) RemovePeer(address string)
RemovePeer removes a peer by address (best-effort) to simulate node leave in tests.
func (*DistMemory) Ring ¶ added in v0.2.0
func (dm *DistMemory) Ring() *cluster.Ring
Ring returns the ring reference.
func (*DistMemory) SetCapacity ¶ added in v0.2.0
func (dm *DistMemory) SetCapacity(capacity int)
SetCapacity sets logical capacity.
func (*DistMemory) SetLocalNode ¶ added in v0.2.0
func (dm *DistMemory) SetLocalNode(node *cluster.Node)
SetLocalNode manually sets the local node (testing helper before starting HTTP).
func (*DistMemory) SetTransport ¶ added in v0.2.0
func (dm *DistMemory) SetTransport(t DistTransport)
SetTransport sets the transport post-construction (testing helper).
func (*DistMemory) Stop ¶ added in v0.2.0
func (dm *DistMemory) Stop(ctx context.Context) error
Stop terminates every background goroutine started by NewDistMemory and shuts down the optional HTTP server. Idempotent and safe to call concurrently — repeat calls are no-ops. Tests SHOULD register Stop via t.Cleanup to avoid goroutine leaks across `-count=N` iterations under -race.
func (*DistMemory) SyncWith ¶ added in v0.2.0
func (dm *DistMemory) SyncWith(ctx context.Context, nodeID string) error
SyncWith performs Merkle anti-entropy against a remote node (pull newer versions for differing chunks).
Returns nil even when the local and remote trees agree (a "clean" sync). Callers that need to distinguish clean from dirty cycles — currently only the adaptive auto-sync backoff path — use syncWithStatus directly.
type DistMemoryOption ¶ added in v0.2.0
type DistMemoryOption func(*DistMemory)
DistMemoryOption configures DistMemory backend.
func WithDistCapacity ¶ added in v0.2.0
func WithDistCapacity(capacity int) DistMemoryOption
WithDistCapacity sets logical capacity (not strictly enforced yet).
func WithDistChaos ¶ added in v0.8.4
func WithDistChaos(c *Chaos) DistMemoryOption
WithDistChaos enables chaos injection on the dist transport. Pass a *Chaos constructed via NewChaos; configure faults via the Chaos's SetDropRate / SetLatency methods.
Option ordering does not matter: WithDistChaos records the reference, and storeTransport wraps the active transport whenever it's set (including the auto-wired HTTP transport).
FOR TESTS ONLY. There's no safety check preventing this from being applied to a production DistMemory — pointing a real cluster at a Chaos with DropRate=1.0 will drop every dist transport call. Don't.
func WithDistGossipInterval ¶ added in v0.2.0
func WithDistGossipInterval(d time.Duration) DistMemoryOption
WithDistGossipInterval enables simple membership gossip at provided interval.
func WithDistHTTPAuth ¶ added in v0.4.2
func WithDistHTTPAuth(auth DistHTTPAuth) DistMemoryOption
WithDistHTTPAuth configures bearer-token (or custom verify/sign) authentication for the dist HTTP server and auto-created HTTP client. See DistHTTPAuth for the policy struct shape and defaults.
Operators must apply the same auth policy to every node in the cluster — peers with mismatched tokens will reject each other's requests with HTTP 401. Like WithDistHTTPLimits this only affects the internal transport; an externally-supplied DistTransport is the caller's responsibility to authenticate.
NewDistMemory validates the resulting policy and returns sentinel.ErrInsecureAuthConfig if ClientSign is set without a matching inbound verifier (Token or ServerVerify) and AllowAnonymousInbound is not set — see DistHTTPAuth for the rationale.
func WithDistHTTPLimits ¶ added in v0.4.2
func WithDistHTTPLimits(limits DistHTTPLimits) DistMemoryOption
WithDistHTTPLimits configures the HTTP transport limits for the dist HTTP server (inbound request bodies, timeouts, concurrency) and the auto-created HTTP client (response body cap, request timeout). Partial overrides are honored: zero-valued fields inherit the package defaults from DistHTTPLimits.withDefaults.
This option only affects the *internal* HTTP server/client created by tryStartHTTP — explicitly-supplied transports via WithDistTransport are the caller's responsibility to bound.
func WithDistHeartbeat ¶ added in v0.2.0
func WithDistHeartbeat(interval, suspectAfter, deadAfter time.Duration) DistMemoryOption
WithDistHeartbeat configures heartbeat interval and suspect/dead thresholds. If interval <= 0 heartbeat is disabled.
func WithDistHeartbeatSample ¶ added in v0.2.1
func WithDistHeartbeatSample(k int) DistMemoryOption
WithDistHeartbeatSample sets how many random peers to probe per heartbeat tick (0=all).
func WithDistHintMaxBytes ¶ added in v0.2.1
func WithDistHintMaxBytes(b int64) DistMemoryOption
WithDistHintMaxBytes sets an approximate byte cap for all queued hints.
func WithDistHintMaxPerNode ¶ added in v0.2.0
func WithDistHintMaxPerNode(n int) DistMemoryOption
WithDistHintMaxPerNode caps number of queued hints per target node.
func WithDistHintMaxTotal ¶ added in v0.2.1
func WithDistHintMaxTotal(n int) DistMemoryOption
WithDistHintMaxTotal sets a global cap on total queued hints across all nodes.
func WithDistHintReplayInterval ¶ added in v0.2.0
func WithDistHintReplayInterval(d time.Duration) DistMemoryOption
WithDistHintReplayInterval sets how often to attempt replay of hints.
func WithDistHintTTL ¶ added in v0.2.0
func WithDistHintTTL(d time.Duration) DistMemoryOption
WithDistHintTTL sets TTL for hinted handoff entries.
func WithDistIndirectProbes ¶ added in v0.6.0
func WithDistIndirectProbes(k int, timeout time.Duration) DistMemoryOption
WithDistIndirectProbes enables SWIM-style indirect probing for the heartbeat path. When a direct probe to a peer fails, this node asks `k` random alive peers to probe the target on its behalf; the target is only marked suspect if every relay also fails. Filters caller-side network blips (NIC reset, brief upstream outage, single stuck connection in a pool) that would otherwise cause spurious suspect/dead transitions.
`timeout` caps each relay's probe call. Pass 0 to inherit the default (half the configured heartbeat interval).
k = 0 disables indirect probing — direct probe alone decides liveness, matching the pre-Phase-B behavior. Recommended k = 3 for production clusters; clusters with fewer than k+1 alive peers scale down automatically (probe whatever's available).
func WithDistListKeysCap ¶ added in v0.2.0
func WithDistListKeysCap(n int) DistMemoryOption
WithDistListKeysCap caps number of keys fetched via fallback ListKeys (0 = unlimited).
func WithDistLogger ¶ added in v0.6.0
func WithDistLogger(logger *slog.Logger) DistMemoryOption
WithDistLogger supplies a structured logger for the dist backend's background loops (heartbeat, hint replay, rebalance, gossip, merkle auto-sync) and operational error surfaces (HTTP listener failures, transport errors, dropped hints). The supplied logger is wrapped with `node_id` and `component=dist_memory` attributes before use, so call sites do not need to weave the node ID through every record.
Pass slog.Default() to inherit the application's logger, or supply a custom *slog.Logger with the desired level / handler. Zero-value (no option call) keeps the dist backend silent — the default uses an io.Discard handler, which means library code never writes to stderr unless the caller opts in.
nil is treated as "no change" — useful when callers conditionally build options.
func WithDistMembership ¶ added in v0.2.0
func WithDistMembership(m *cluster.Membership, node *cluster.Node) DistMemoryOption
WithDistMembership injects an existing membership and (optionally) a local node for multi-node tests. If node is nil a new one will be created.
func WithDistMerkleAdaptiveBackoff ¶ added in v0.8.2
func WithDistMerkleAdaptiveBackoff(maxFactor int) DistMemoryOption
WithDistMerkleAdaptiveBackoff enables adaptive scheduling for the Merkle anti-entropy loop. When maxFactor > 1, the loop doubles its sleep interval (1×, 2×, 4×, 8×, …) after each tick that found zero divergence across every peer, capped at maxFactor. Any tick with at least one dirty peer resets the factor to 1 on the next sleep — recovery is always immediate, never lazy.
maxFactor <= 1 disables backoff (the default): the loop wakes at every `WithDistMerkleAutoSync` interval regardless of recent divergence.
The current factor is exposed as `dist.auto_sync.backoff_factor` and the cumulative count of clean ticks as `dist.auto_sync.clean_ticks`. Each factor change is logged once at Info; no per-tick spam.
func WithDistMerkleAutoSync ¶ added in v0.2.0
func WithDistMerkleAutoSync(interval time.Duration) DistMemoryOption
WithDistMerkleAutoSync enables periodic anti-entropy sync attempts. If interval <= 0 disables.
func WithDistMerkleAutoSyncPeers ¶ added in v0.2.0
func WithDistMerkleAutoSyncPeers(n int) DistMemoryOption
WithDistMerkleAutoSyncPeers limits number of peers synced per interval (0 or <0 = all).
func WithDistMerkleChunkSize ¶ added in v0.2.0
func WithDistMerkleChunkSize(n int) DistMemoryOption
WithDistMerkleChunkSize sets the number of keys per leaf hash chunk (default 128 if 0).
func WithDistMeterProvider ¶ added in v0.6.0
func WithDistMeterProvider(mp metric.MeterProvider) DistMemoryOption
WithDistMeterProvider supplies an OpenTelemetry MeterProvider for the dist backend. When set, NewDistMemory registers an observable instrument for every field on DistMetrics — counters for cumulative totals (writes, forwards, hints, rebalance batches, etc.), gauges for current state (active tombstones, hint queue size, alive/suspect/dead member counts) and last-operation latencies (merkle build/diff/fetch nanoseconds, last rebalance/auto-sync duration). Instrument names use the `dist.` prefix so a Prometheus exporter can route them under a dedicated subsystem.
A single registered callback drives all instruments: on each collection cycle it takes one Metrics() snapshot and observes every instrument from that snapshot. There is no per-operation overhead when a real meter is configured beyond the existing atomic counters the dist backend already maintains.
Pass otel.GetMeterProvider() to inherit the application's globally registered provider, or supply a custom MeterProvider built via the otel/sdk/metric package (typically wrapping a Prometheus exporter or OTLP pipeline). nil is treated as "no change" — useful for conditional option building.
Library default (no option call) installs a no-op meter, so library code emits no metrics unless the caller opts in.
func WithDistNode ¶ added in v0.2.0
func WithDistNode(id, address string) DistMemoryOption
WithDistNode identity (id optional; derived from address if empty). Address used for future RPC.
func WithDistParallelReads ¶ added in v0.2.0
func WithDistParallelReads(enable bool) DistMemoryOption
WithDistParallelReads enables parallel quorum/all read fan-out.
func WithDistReadConsistency ¶ added in v0.2.0
func WithDistReadConsistency(l ConsistencyLevel) DistMemoryOption
WithDistReadConsistency sets read consistency (default ONE).
func WithDistReadRepairBatch ¶ added in v0.8.4
func WithDistReadRepairBatch(interval time.Duration, maxBatchSize int) DistMemoryOption
WithDistReadRepairBatch enables async coalescing of read-repair fan-out. When interval > 0, repairs from the read path are queued by destination peer; the queue flushes periodically OR when a peer's pending count hits maxBatchSize. Repairs to the same (peer, key) collapse to the highest-version entry — concurrent reads of the same hot key produce one repair, not N.
Default (interval = 0 or maxBatchSize <= 0): repairs dispatch synchronously inside the Get path. Existing callers asserting "replica healed by the time Get returns" see byte-identical behavior.
Trade-off: batched mode introduces a window (up to `interval`) where a divergent replica stays divergent. Merkle anti-entropy is the convergence safety net; the read-repair path is and always was best-effort. Stop() drains pending entries before returning so a clean shutdown doesn't lose queued repairs.
func WithDistRebalanceBatchSize ¶ added in v0.2.1
func WithDistRebalanceBatchSize(n int) DistMemoryOption
WithDistRebalanceBatchSize sets max keys per transfer batch.
func WithDistRebalanceInterval ¶ added in v0.2.1
func WithDistRebalanceInterval(d time.Duration) DistMemoryOption
WithDistRebalanceInterval enables periodic ownership rebalancing checks (<=0 disables).
func WithDistRebalanceMaxConcurrent ¶ added in v0.2.1
func WithDistRebalanceMaxConcurrent(n int) DistMemoryOption
WithDistRebalanceMaxConcurrent limits concurrent batch transfers.
func WithDistRemovalGrace ¶ added in v0.2.2
func WithDistRemovalGrace(d time.Duration) DistMemoryOption
WithDistRemovalGrace sets grace period before shedding data for keys we no longer own (<=0 immediate remove disabled for now).
func WithDistReplicaDiffMaxPerTick ¶ added in v0.2.2
func WithDistReplicaDiffMaxPerTick(n int) DistMemoryOption
WithDistReplicaDiffMaxPerTick limits number of replica-diff replication operations performed per rebalance tick (0 = unlimited).
func WithDistReplication ¶ added in v0.2.0
func WithDistReplication(n int) DistMemoryOption
WithDistReplication sets ring replication factor (owners per key).
func WithDistSeeds ¶ added in v0.2.0
func WithDistSeeds(addresses []string) DistMemoryOption
WithDistSeeds configures static seed node addresses.
func WithDistShardCount ¶ added in v0.2.0
func WithDistShardCount(n int) DistMemoryOption
WithDistShardCount sets number of shards (min 1).
func WithDistTombstoneSweep ¶ added in v0.2.0
func WithDistTombstoneSweep(interval time.Duration) DistMemoryOption
WithDistTombstoneSweep sets sweep interval for tombstone compaction (<=0 disables automatic sweeps).
func WithDistTombstoneTTL ¶ added in v0.2.0
func WithDistTombstoneTTL(d time.Duration) DistMemoryOption
WithDistTombstoneTTL configures how long tombstones are retained before subject to compaction (<=0 keeps indefinitely).
func WithDistTracerProvider ¶ added in v0.6.0
func WithDistTracerProvider(tp trace.TracerProvider) DistMemoryOption
WithDistTracerProvider supplies an OpenTelemetry TracerProvider for the dist backend. When set, every public Get/Set/Remove call opens a span (`dist.get` / `dist.set` / `dist.remove`) carrying consistency level and key length attributes; replication fan-out adds child spans (`dist.replicate.set` / `dist.replicate.remove`) per peer so operators can see where time is spent under load.
Span attributes intentionally omit the cache key value — keys can be PII (user IDs, session tokens). Only `cache.key.length` is recorded. Callers needing the key value should add their own outer span before invoking the dist backend.
Pass otel.GetTracerProvider() to inherit the application's globally registered provider, or supply a custom *sdktrace.TracerProvider to route dist spans to a dedicated exporter. nil is treated as "no change" — useful for conditional option building.
Library default (no option call) installs a no-op tracer, so library code emits no spans unless the caller opts in.
func WithDistTransport ¶ added in v0.2.0
func WithDistTransport(t DistTransport) DistMemoryOption
WithDistTransport sets a transport used for forwarding / replication.
func WithDistVirtualNodes ¶ added in v0.2.0
func WithDistVirtualNodes(n int) DistMemoryOption
WithDistVirtualNodes sets number of virtual nodes per physical node for consistent hash ring.
func WithDistWriteConsistency ¶ added in v0.2.0
func WithDistWriteConsistency(l ConsistencyLevel) DistMemoryOption
WithDistWriteConsistency sets write consistency (default QUORUM).
type DistMetrics ¶ added in v0.2.0
type DistMetrics struct {
ForwardGet int64
ForwardSet int64
ForwardRemove int64
ReplicaFanoutSet int64
ReplicaFanoutRemove int64
ReadRepair int64
ReadRepairBatched int64 // subset of ReadRepair dispatched via the async coalescer
ReadRepairCoalesced int64 // repairs short-circuited by the coalescer (duplicate same-version entries)
ReplicaGetMiss int64
HeartbeatSuccess int64
HeartbeatFailure int64
IndirectProbeSuccess int64
IndirectProbeFailure int64
IndirectProbeRefuted int64
Drains int64
NodesSuspect int64
NodesDead int64
NodesRemoved int64
VersionConflicts int64
VersionTieBreaks int64
ReadPrimaryPromote int64
HintedQueued int64
HintedReplayed int64
HintedExpired int64
HintedDropped int64
HintedGlobalDropped int64
HintedBytes int64
MigrationHintQueued int64 // subset of HintedQueued attributable to rebalance migrations
MigrationHintReplayed int64 // subset of HintedReplayed for migration hints
MigrationHintExpired int64 // subset of HintedExpired for migration hints
MigrationHintDropped int64 // subset of HintedDropped + HintedGlobalDropped for migration hints
MigrationHintLastAgeNanos int64 // queue residency of the most-recently-replayed migration hint (ns)
MerkleSyncs int64
MerkleKeysPulled int64
MerkleBuildNanos int64
MerkleDiffNanos int64
MerkleFetchNanos int64
AutoSyncLoops int64
LastAutoSyncNanos int64
LastAutoSyncError string
AutoSyncCleanTicks int64 // cumulative ticks where every peer returned no divergence
AutoSyncBackoffFactor int64 // current adaptive-backoff multiplier (1 when disabled or freshly reset)
ChaosDrops int64 // transport calls dropped by configured Chaos (test-only; zero in prod)
ChaosLatencies int64 // transport calls that had latency injected by Chaos (test-only)
TombstonesActive int64
TombstonesPurged int64
WriteQuorumFailures int64
WriteAcks int64
WriteAttempts int64
WriteForwardPromotion int64
WriteApplyRefused int64
RebalancedKeys int64
RebalanceBatches int64
RebalanceThrottle int64
RebalanceLastNanos int64
RebalancedReplicaDiff int64
RebalanceReplicaDiffThrottle int64
RebalancedPrimary int64
MembershipVersion uint64 // current membership version (incremented on changes)
MembersAlive int64 // current alive members
MembersSuspect int64 // current suspect members
MembersDead int64 // current dead members
}
DistMetrics snapshot.
type DistTransport ¶ added in v0.2.0
type DistTransport interface {
ForwardSet(ctx context.Context, nodeID string, item *cache.Item, replicate bool) error
ForwardGet(ctx context.Context, nodeID, key string) (*cache.Item, bool, error)
ForwardRemove(ctx context.Context, nodeID, key string, replicate bool) error
Health(ctx context.Context, nodeID string) error
// IndirectHealth asks `relayNodeID` to probe `targetNodeID` on the
// caller's behalf. Used by the SWIM-style indirect-probe path: when
// a direct probe to target fails, several relay nodes are asked to
// probe target; if any of them succeeds, the target is alive and
// the caller's local network was the issue, not the target.
// Returns nil when the relay reports the target reachable.
IndirectHealth(ctx context.Context, relayNodeID, targetNodeID string) error
// Gossip pushes the caller's full member-list snapshot to
// `targetNodeID`. The receiver merges it via higher-incarnation-
// wins and self-refutes if the snapshot claims it is suspect.
// Used by the cross-process gossip path; in-process clusters
// short-circuit to a direct method call instead.
Gossip(ctx context.Context, targetNodeID string, members []GossipMember) error
FetchMerkle(ctx context.Context, nodeID string) (*MerkleTree, error)
// ListKeys enumerates keys held on the remote node's shards,
// optionally filtered by `pattern`. Empty pattern returns all
// keys; a pattern containing any glob meta-character (* ? [) is
// matched via path.Match, otherwise treated as a prefix.
// Implementations walk the per-shard cursor pagination internally
// and return the materialized key set to the caller; safe for
// node-scale enumeration, capped at DistMemory.listKeysMax to
// bound memory.
ListKeys(ctx context.Context, nodeID, pattern string) ([]string, error)
}
DistTransport defines forwarding operations needed by DistMemory.
type GossipMember ¶ added in v0.6.5
type GossipMember struct {
ID string `json:"id"`
Address string `json:"address"`
State string `json:"state"`
Incarnation uint64 `json:"incarnation"`
}
GossipMember is the wire-friendly snapshot of a cluster.Node used by the Gossip transport method. Stays a separate struct from cluster.Node so the wire schema doesn't drift when the cluster package adds internal fields.
type IBackend ¶
type IBackend[T IBackendConstrain] interface { // Get retrieves the item with the given key from the cache. // If the key is not found in the cache, it returns nil. Get(ctx context.Context, key string) (item *cache.Item, ok bool) // Set adds a new item to the cache. Set(ctx context.Context, item *cache.Item) error // Capacity returns the maximum number of items that can be stored in the cache. Capacity() int // SetCapacity sets the maximum number of items that can be stored in the cache. SetCapacity(capacity int) // Count returns the number of items currently stored in the cache. Count(ctx context.Context) int // Remove deletes the item with the given key from the cache. Remove(ctx context.Context, keys ...string) error // List the items in the cache that meet the specified criteria. List(ctx context.Context, filters ...IFilter) (items []*cache.Item, err error) // Clear removes all items from the cache. Clear(ctx context.Context) error }
IBackend defines the contract that all cache backends must implement. It provides a generic interface for cache operations with type safety through the IBackendConstrain type parameter.
Type parameter T must satisfy IBackendConstrain, limiting implementations to supported backend types like InMemory and Redis.
All methods accept a context.Context parameter for cancellation and timeout control, enabling graceful handling of long-running operations.
func NewDistMemory ¶ added in v0.2.0
func NewDistMemory(ctx context.Context, opts ...DistMemoryOption) (IBackend[DistMemory], error)
NewDistMemory creates a new DistMemory backend.
func NewDistMemoryWithConfig ¶ added in v0.2.1
func NewDistMemoryWithConfig(ctx context.Context, cfg any, opts ...DistMemoryOption) (IBackend[DistMemory], error)
NewDistMemoryWithConfig builds a DistMemory from an external dist.Config shape without introducing a direct import here. Accepts a generic 'cfg' to avoid adding a dependency layer; expects exported fields matching internal/dist Config.
func NewInMemory ¶
NewInMemory creates a new in-memory cache with the given options.
func NewRedisCluster ¶ added in v0.1.8
func NewRedisCluster(redisOptions ...Option[RedisCluster]) (IBackend[RedisCluster], error)
NewRedisCluster creates a new Redis Cluster backend with the given options.
type IBackendConstrain ¶
type IBackendConstrain interface {
InMemory | Redis | RedisCluster | DistMemory
}
IBackendConstrain defines the type constraint for cache backend implementations. It restricts the generic type parameter to supported backend types, ensuring type safety and proper implementation at compile time.
type IFilter ¶
type IFilter interface {
ApplyFilter(backendType string, items []*cache.Item) ([]*cache.Item, error)
}
IFilter is a backend agnostic interface for a filter that can be applied to a list of items.
func WithFilterFunc ¶
WithFilterFunc returns a filter that filters the items by a given field's value.
func WithSortBy ¶
WithSortBy returns a filter that sorts the items by a given field.
type InMemory ¶
type InMemory struct {
// contains filtered or unexported fields
}
InMemory is a cache backend that stores the items in memory, leveraging a custom sharded ConcurrentMap. Thread-safety is provided by the underlying ConcurrentMap; no backend-level mutex is needed.
func (*InMemory) Get ¶
Get retrieves the item with the given key from the cacheBackend. If the item is not found, it returns nil.
func (*InMemory) List ¶
List returns a list of all items in the cache filtered and ordered by the given options.
func (*InMemory) Remove ¶
Remove removes items with the given key from the cacheBackend. If an item is not found, it does nothing.
func (*InMemory) SetCapacity ¶
SetCapacity sets the capacity of the cache.
type InProcessTransport ¶ added in v0.2.0
type InProcessTransport struct {
// contains filtered or unexported fields
}
InProcessTransport implements DistTransport for multiple DistMemory instances in the same process.
func NewInProcessTransport ¶ added in v0.2.0
func NewInProcessTransport() *InProcessTransport
NewInProcessTransport creates a new empty transport.
func (*InProcessTransport) FetchMerkle ¶ added in v0.2.0
func (t *InProcessTransport) FetchMerkle(_ context.Context, nodeID string) (*MerkleTree, error)
FetchMerkle fetches a remote merkle tree.
func (*InProcessTransport) ForwardGet ¶ added in v0.2.0
func (t *InProcessTransport) ForwardGet(_ context.Context, nodeID, key string) (*cache.Item, bool, error)
ForwardGet forwards a get operation to the specified backend node.
func (*InProcessTransport) ForwardRemove ¶ added in v0.2.0
func (t *InProcessTransport) ForwardRemove(ctx context.Context, nodeID, key string, replicate bool) error
ForwardRemove forwards a remove operation.
func (*InProcessTransport) ForwardSet ¶ added in v0.2.0
func (t *InProcessTransport) ForwardSet(ctx context.Context, nodeID string, item *cache.Item, replicate bool) error
ForwardSet forwards a set operation to the specified backend node.
func (*InProcessTransport) Gossip ¶ added in v0.6.5
func (t *InProcessTransport) Gossip(_ context.Context, targetNodeID string, members []GossipMember) error
Gossip delivers the snapshot directly to the target backend's acceptGossip — this is the in-process equivalent of the HTTP `/internal/gossip` endpoint, with the type translation done inline so the rest of the SWIM machinery can stay agnostic to transport choice.
func (*InProcessTransport) Health ¶ added in v0.2.0
func (t *InProcessTransport) Health(_ context.Context, nodeID string) error
Health probes a backend.
func (*InProcessTransport) IndirectHealth ¶ added in v0.6.0
func (t *InProcessTransport) IndirectHealth(ctx context.Context, relayNodeID, targetNodeID string) error
IndirectHealth asks the relay backend to probe target. In-process the relay's perspective on target is the same lookup table, so this is equivalent to a direct probe — tests that wire two InProcessTransport instances per cluster will exercise the relay-failure path naturally.
func (*InProcessTransport) ListKeys ¶ added in v0.8.7
ListKeys enumerates the remote node's local shards, optionally filtered by pattern. In-process means we have direct access to the target's shards — no HTTP roundtrip, no cursor walking, just a synchronous in-memory scan.
func (*InProcessTransport) Register ¶ added in v0.2.0
func (t *InProcessTransport) Register(b *DistMemory)
Register adds backends; safe to call multiple times.
func (*InProcessTransport) Unregister ¶ added in v0.2.0
func (t *InProcessTransport) Unregister(id string)
Unregister removes a backend (simulate failure in tests).
type ListKeysResult ¶ added in v0.8.7
ListKeysResult bundles a cluster-wide key enumeration with the best-effort accounting the caller needs to communicate partial results to the operator. `Keys` is sorted and deduplicated across owners. `Truncated` is true when the merged set hit `max` and we stopped pulling further pages. `PartialNodes` lists peers whose fan-out call failed — their keys may be missing from `Keys`.
type MerkleTree ¶ added in v0.2.0
MerkleTree represents a binary hash tree over key/version pairs.
func (*MerkleTree) DiffLeafRanges ¶ added in v0.2.0
func (mt *MerkleTree) DiffLeafRanges(other *MerkleTree) []int
DiffLeafRanges compares two trees and returns indexes of differing leaf chunks.
type Option ¶
type Option[T IBackendConstrain] func(*T)
Option is a function type that can be used to configure the `HyperCache` struct.
func WithCapacity ¶
func WithCapacity[T IBackendConstrain](capacity int) Option[T]
WithCapacity is an option that sets the capacity of the cache.
func WithClusterKeysSetName ¶ added in v0.1.8
func WithClusterKeysSetName[T RedisCluster](keysSetName string) Option[RedisCluster]
WithClusterKeysSetName sets the name of the set for cluster backend keys.
func WithClusterSerializer ¶ added in v0.1.8
func WithClusterSerializer[T RedisCluster](ser serializer.ISerializer) Option[RedisCluster]
WithClusterSerializer sets the serializer for the cluster backend.
func WithKeysSetName ¶
WithKeysSetName is an option that sets the name of the set that holds the keys of the items in the cache.
func WithRedisClient ¶
WithRedisClient is an option that sets the redis client to use.
func WithRedisClusterClient ¶ added in v0.1.8
func WithRedisClusterClient[T RedisCluster](client *redis.ClusterClient) Option[RedisCluster]
WithRedisClusterClient sets the redis cluster client to use.
func WithSerializer ¶
func WithSerializer[T Redis](backendSerializer serializer.ISerializer) Option[Redis]
WithSerializer is an option that sets the serializer to use. The serializer is used to serialize and deserialize the items in the cache.
- The default serializer is `serializer.MsgpackSerializer`.
- The `serializer.JSONSerializer` can be used to serialize and deserialize the items in the cache as JSON.
- The interface `serializer.ISerializer` can be implemented to use a custom serializer.
type Redis ¶
type Redis struct {
Serializer serializer.ISerializer // Serializer is the serializer used to serialize the items before storing them in the cache
// contains filtered or unexported fields
}
Redis is a cache backend that stores the items in a redis implementation.
func (*Redis) Capacity ¶
Capacity returns the maximum number of items that can be stored in the cache.
func (*Redis) Get ¶
Get retrieves the Item with the given key from the cacheBackend. If the item is not found, it returns nil.
func (*Redis) List ¶
List returns a list of all the items in the cacheBackend that match the given filter options.
func (*Redis) SetCapacity ¶
SetCapacity sets the capacity of the cache.
type RedisCluster ¶ added in v0.1.8
type RedisCluster struct {
Serializer serializer.ISerializer
// contains filtered or unexported fields
}
RedisCluster is a cache backend that stores items in a Redis Cluster. It mirrors the single-node Redis backend semantics but uses go-redis ClusterClient.
func (*RedisCluster) Capacity ¶ added in v0.1.8
func (cacheBackend *RedisCluster) Capacity() int
Capacity returns the capacity of the cluster backend.
func (*RedisCluster) Clear ¶ added in v0.1.8
func (cacheBackend *RedisCluster) Clear(ctx context.Context) error
Clear flushes the database.
func (*RedisCluster) Count ¶ added in v0.1.8
func (cacheBackend *RedisCluster) Count(ctx context.Context) int
Count returns the number of keys stored.
func (*RedisCluster) List ¶ added in v0.1.8
func (cacheBackend *RedisCluster) List(ctx context.Context, filters ...IFilter) ([]*cache.Item, error)
List returns items matching optional filters.
func (*RedisCluster) Remove ¶ added in v0.1.8
func (cacheBackend *RedisCluster) Remove(ctx context.Context, keys ...string) error
Remove deletes the specified keys.
func (*RedisCluster) SetCapacity ¶ added in v0.1.8
func (cacheBackend *RedisCluster) SetCapacity(capacity int)
SetCapacity sets the capacity of the cluster backend.
type SortOrderFilter ¶
type SortOrderFilter struct {
// contains filtered or unexported fields
}
SortOrderFilter is a filter that sorts the items by a given field.
func WithSortOrderAsc ¶
func WithSortOrderAsc(ascending bool) SortOrderFilter
WithSortOrderAsc returns a filter that determines whether to sort ascending or not.
func (SortOrderFilter) ApplyFilter ¶
ApplyFilter applies the sort order filter to the given list of items.
Source Files
¶
Directories
¶
| Path | Synopsis |
|---|---|
|
Package redis provides configuration options and utilities for Redis backend implementation.
|
Package redis provides configuration options and utilities for Redis backend implementation. |
|
Package rediscluster provides configuration options and utilities for Redis Cluster backend implementation.
|
Package rediscluster provides configuration options and utilities for Redis Cluster backend implementation. |