Documentation
¶
Overview ¶
Package podshare is a generic, type-safe cross-pod data store.
A Store[T] holds a strongly-typed map[string]T that is replicated across every pod sharing the same topic. Reads are served from a local in-memory cache (zero network hop). Writes are applied locally first, then broadcast over a pluggable Transport so peers converge on the same state.
Conflict resolution is last-write-wins ordered by (Timestamp, Origin, Seq). For richer semantics (CRDT, vector clocks) layer merge logic on top of the Watch channel.
Two transports ship with the module: a Redis pub/sub transport in transport/redis.go for shops that already run Redis, and a P2P TCP transport in transport/p2p.go for dependency-free meshes. A Memory transport in transport/memory.go is provided for tests.
When to use podshare ¶
Good fits — small mutable state that every pod needs the same view of:
- feature flags and config
- rate-limit counters and quotas
- chat-history hot cache
- presence / online-user registries
- fleet-wide circuit breakers and kill switches
- small routing tables ("user X lives on pod-3")
Bad fits — reach for something else:
- source of truth for unloseable data (no durability guarantee)
- large datasets (every pod holds the full map)
- high write churn on the same key (LWW drops concurrent updates)
- strong consistency, locks, or leader election (use etcd)
- cross-region replication (designed for in-cluster fan-out)
Trade-offs vs alternatives ¶
olric — embedded distributed KV with sharding and replicas. Pick when the dataset is too large to replicate on every pod and you want it embedded with no extra service.
NATS JetStream KV — keyed bucket with watch, history, and durability. Runs against a NATS cluster (not embedded). The "boring production" answer when state must survive pod restarts.
etcd v3 — linearizable KV with watch. Overkill for shared cache; the right pick for locks, leader election, and config that must not split-brain.
hashicorp/memberlist — gossip primitive. Use as a building block if you need gossip semantics (anti-entropy, failure detection) and you're willing to write the typed map and LWW logic yourself.
groupcache — peer cache with consistent hashing. Different model: each key lives on one peer, not full replication. Cannot substitute for podshare's "every pod has the full view."
Operational notes ¶
LWW orders by wall-clock time. Run NTP or PTP — sustained skew between pods will reorder writes.
Snapshot persistence is asynchronous and debounced (default 200ms, see WithSnapshotInterval). Bursts coalesce into a single snapshot; Close flushes any pending state.
Tombstones are kept for WithTombstoneTTL (default 24h) so concurrent late writes from peers cannot resurrect deleted keys. Expired tombstones are GCed at snapshot time, so an idle topic will not compact until the next write.
Watch uses non-blocking dispatch. A consumer whose buffer fills is dropped (channel closed) and reported via WithErrorHandler; treat a closed Watch channel as "fell behind — re-subscribe and re-snapshot via Get."
Each Transport documents its own semantics. RedisTransport is at-most-once (Redis Pub/Sub does not replay missed messages); P2PTransport is plaintext unless you supply a TLSConfig with mTLS for authenticated meshes.
P2P peer discovery ¶
P2PTransport's Peers list at construction is just the initial set. To react to fleet changes — Kubernetes HPA scaling, manual kubectl scale, deploy rolls, hand-managed VMs — use AddPeer / RemovePeer from a discovery loop. The bundled transport.DNSDiscoverer covers the headless-Service-plus-DNS pattern in Kubernetes; for other sources (env list, file watch, admin HTTP, etcd/Consul) wire your own loop to transport.SyncPeers and the transport reconciles.
Index ¶
- Constants
- Variables
- type Codec
- type Event
- type EventKind
- type JSONCodec
- type Option
- func WithCodec[T any](c Codec) Option[T]
- func WithErrorHandler[T any](fn func(error)) Option[T]
- func WithGCInterval[T any](d time.Duration) Option[T]
- func WithLogger[T any](l *slog.Logger) Option[T]
- func WithMaxClockSkew[T any](d time.Duration) Option[T]
- func WithMerger[T any](fn func(existing, incoming T) T) Option[T]
- func WithNodeID[T any](id string) Option[T]
- func WithSnapshotInterval[T any](d time.Duration) Option[T]
- func WithTombstoneTTL[T any](d time.Duration) Option[T]
- func WithWatchBuffer[T any](n int) Option[T]
- type Stats
- type Store
- func (s *Store[T]) Close() error
- func (s *Store[T]) CloseWithContext(ctx context.Context) error
- func (s *Store[T]) Delete(ctx context.Context, key string) error
- func (s *Store[T]) DeleteMany(ctx context.Context, keys []string) error
- func (s *Store[T]) Get(key string) (T, bool)
- func (s *Store[T]) Keys() []string
- func (s *Store[T]) NodeID() string
- func (s *Store[T]) Refresh(ctx context.Context) error
- func (s *Store[T]) Seed(initial map[string]T) error
- func (s *Store[T]) Set(ctx context.Context, key string, value T) error
- func (s *Store[T]) SetMany(ctx context.Context, kvs map[string]T) error
- func (s *Store[T]) Snapshot() map[string]T
- func (s *Store[T]) Stats() Stats
- func (s *Store[T]) Topic() string
- func (s *Store[T]) TrySet(ctx context.Context, key string, value T, prev Version) (bool, error)
- func (s *Store[T]) Version(key string) Version
- func (s *Store[T]) Watch(ctx context.Context, opts ...WatchOption) <-chan Event[T]
- type StrictJSONCodec
- type Transport
- type Version
- type WatchOption
Constants ¶
const ProtocolVersion uint8 = 1
ProtocolVersion identifies the wire format. Bump when a breaking change is made. Receivers reject messages whose V is higher than this constant; lower versions are accepted with default values for any fields the sender didn't populate.
Variables ¶
var ErrBroadcast = errors.New("podshare: broadcast failed; local state updated")
ErrBroadcast wraps transport publish failures from Set/Delete/SetMany. When errors.Is(err, ErrBroadcast) is true, the local replica has already applied the write — only the fan-out to peers failed. Callers should decide retry semantics based on this distinction: retrying a successful local write produces a write with a fresh timestamp/seq, which is safe but not idempotent.
var ErrClosed = errors.New("podshare: store closed")
ErrClosed is returned by methods invoked after Close.
var ErrSeedNonEmpty = errors.New("podshare: Seed called on non-empty store")
ErrSeedNonEmpty is returned by Seed when the store already holds data. Seed is meant to populate a fresh Store before any Set/Watch.
Functions ¶
This section is empty.
Types ¶
type Codec ¶
Codec encodes and decodes the per-key value type T to and from bytes before it crosses a Transport boundary or hits a snapshot. The default is JSONCodec; swap it via WithCodec for faster wire formats such as msgpack or protobuf.
Codec methods must be safe for concurrent use.
type Event ¶
Event is a single change applied to the local replica. Origin is the node ID that authored the change, including this node's own ID for changes produced locally.
type JSONCodec ¶
type JSONCodec struct{}
JSONCodec is the default Codec — encoding/json. Marshals values to JSON, unmarshal is permissive (unknown fields in the wire payload are silently dropped).
The permissive behavior is fine for additive schema changes — old pods read new payloads without crashing — but it hides field-drop during rolling deploys with mixed versions of T. See StrictJSONCodec for the loud-failure variant.
type Option ¶
type Option[T any] interface { // contains filtered or unexported methods }
Option configures a Store at construction. Pass options to New.
func WithErrorHandler ¶
WithErrorHandler installs a callback that receives non-fatal errors: undecodable peer messages, snapshot failures, and dropped slow watchers. The callback runs on Store-internal goroutines and must be cheap and non-blocking. Default discards errors silently.
func WithGCInterval ¶
WithGCInterval sets how often the snapshotter wakes up to compact expired tombstones even when no writes are happening. Default 5m; set to 0 to GC only on writes.
func WithLogger ¶
WithLogger installs a slog.Logger for non-error operational events: snapshot writes, tombstone GC, watcher lifecycle. Errors still flow through WithErrorHandler. Default nil (no logging).
func WithMaxClockSkew ¶
WithMaxClockSkew sets the threshold above which an incoming peer message's timestamp triggers an OnError report. Default 1 minute. Set to 0 to disable the check.
LWW depends on wall-clock time, so sustained skew silently reorders writes; this option turns the issue into an observable signal instead of a mystery. Skewed messages are still applied — replication continues, the operator just learns something is wrong.
func WithMerger ¶
WithMerger installs a per-value merge function. When set, every applied Set merges the incoming value with the existing value (if any non-tombstoned value is present); the result is stored under the incoming write's timestamp/seq.
CRDT REQUIREMENTS — read carefully.
The wire format carries the unmerged incoming value, not the post-merge result. Each peer runs the merger against its own local state. For replicas to converge, the merger MUST form a join semilattice over T:
commutative: fn(a, b) == fn(b, a) associative: fn(fn(a, b), c) == fn(a, fn(b, c)) idempotent: fn(a, a) == a
Good fits: G-Set unions, OR-Set merges, monotonic counters, "max" over a numeric field, last-writer-wins per nested field where the nested timestamps converge under the same rules.
Bad fits — replicas WILL diverge: list append (order depends on arrival), "newest by client-supplied timestamp" (skew-sensitive), any operation that needs the merged history of inputs.
For an ordered chat-message log, model the value as a G-Set of {ID, Timestamp, Content}, deduplicate by ID, and sort on read; do NOT use a list-append merger.
Locking: the merger runs while the Store mutex is held. It must not call back into the Store (Get/Set/Watch/Refresh/Close) — doing so will deadlock. Keep it pure and fast.
func WithNodeID ¶
WithNodeID overrides the auto-generated node identifier. Node IDs must be unique per process; collisions break last-write-wins tiebreaking.
func WithSnapshotInterval ¶
WithSnapshotInterval sets the debounce window between snapshot writes. Bursty writes inside the window collapse into a single Transport.Snapshot call. Default 200ms. Lower values shorten the catch-up gap for new joiners but raise transport pressure.
func WithTombstoneTTL ¶
WithTombstoneTTL sets how long deleted keys are retained as tombstones. During this window, late-arriving writes for a deleted key are still rejected by LWW. After it, tombstones are GCed at the next snapshot. Default 24h.
func WithWatchBuffer ¶
WithWatchBuffer sets the capacity of channels returned by Watch. A watcher whose buffer fills at dispatch time is dropped (channel closed); size this to comfortably absorb your peak event burst.
type Stats ¶
type Stats struct {
WritesLocal uint64
WritesApplied uint64
WritesRejected uint64
Reads uint64
EventsDispatched uint64
WatchersActive int
WatchersDropped uint64
Snapshots uint64
SnapshotBytes int64
Keys int
Tombstones int
TombstonesGCed uint64
}
Stats is a point-in-time snapshot of internal counters. Cheap; safe to call from a hot path.
type Store ¶
type Store[T any] struct { // contains filtered or unexported fields }
Store is the type-safe replicated map. Construct with New, release with Close. All methods are safe for concurrent use.
Conflict resolution is last-write-wins ordered by (Timestamp, Origin, Seq). Wall-clock skew between pods will therefore reorder writes — run NTP/PTP in production. The third tiebreaker, Seq, is a per-node monotonic counter that prevents same-instant same-node writes from being silently dropped.
func New ¶
func New[T any](ctx context.Context, topic string, t Transport, opts ...Option[T]) (*Store[T], error)
New constructs a Store, joins topic, and pulls the latest snapshot from peers. The returned Store is ready for reads and writes immediately; callers should defer Close to release Store-local goroutines. The underlying Transport is owned by the caller — close it after every Store using it has been closed.
func (*Store[T]) Close ¶
Close stops background work and closes every Watch channel. The underlying Transport is not closed — callers own its lifetime. Safe to call more than once.
Close blocks until background goroutines exit, which includes a final snapshot flush via the configured Transport. If the transport may be stuck, use CloseWithContext to bound the wait.
func (*Store[T]) CloseWithContext ¶
CloseWithContext is like Close but stops waiting once ctx is done. Returns ctx.Err() if shutdown didn't complete in time; the store is still marked closed (writes return ErrClosed) and watcher channels are closed, but the snapshot loop may not have flushed.
func (*Store[T]) Delete ¶
Delete tombstones key locally and broadcasts the deletion. Subsequent Get calls return the zero value with ok=false. Tombstones are kept for TombstoneTTL so concurrent late writes from peers cannot resurrect a deleted key.
func (*Store[T]) DeleteMany ¶
DeleteMany tombstones keys atomically. Same atomicity guarantee as SetMany — readers see all-or-none on the local view.
func (*Store[T]) Get ¶
Get returns the value stored under key. The boolean is false when the key is absent or has been tombstoned.
func (*Store[T]) NodeID ¶
NodeID returns this Store's identifier — useful for filtering own-origin events from a Watch channel.
func (*Store[T]) Refresh ¶
Refresh re-fetches the snapshot from the transport and merges it into local state under LWW. Use this after detecting a transport reconnect (P2P drop/restore, Redis failover) to recover events that may have been missed during the disconnection window.
Adopted entries fire Watch events so consumers learn about catch-up state through the same channel as live events.
func (*Store[T]) Seed ¶
Seed loads initial state without broadcasting. Use it during pod startup to hydrate from a database before announcing presence. Entries are inserted with timestamp now() and this node's origin/seq so peers converge through normal LWW; Seed does NOT publish messages or run watchers. Snapshot persistence is triggered once via markDirty.
Seed must be called on an empty Store, before any Set or Watch. It returns ErrSeedNonEmpty otherwise — silently overwriting after the store has merged peer state is a footgun we'd rather make loud.
func (*Store[T]) Set ¶
Set writes value under key and broadcasts the change to peers. The write is applied locally first; the broadcast is best-effort against the transport. A returned error means the broadcast failed — the local write has already taken effect. Snapshot persistence is asynchronous: it is debounced through the snapshotter goroutine and flushed on Close.
func (*Store[T]) SetMany ¶
SetMany applies kvs atomically to the local view (other readers see either the pre-batch or post-batch state, never partial). Each entry is broadcast individually; a transport-level failure on any item is returned, but earlier items have already been applied locally and (possibly) published.
func (*Store[T]) Stats ¶
Stats returns a point-in-time snapshot of internal counters. O(1) — all counts are maintained incrementally as writes apply, so calling Stats in a tight loop (e.g. a metrics exporter) is cheap.
func (*Store[T]) TrySet ¶
TrySet writes value under key only if the current local Version matches prev. Returns (true, nil) on success, (false, nil) if the precondition failed, or an error for transport problems.
IMPORTANT — this is NOT a distributed compare-and-swap. Two pods can both observe Version V, both pass their local TrySet check, and both broadcast; LWW then picks one winner. Use TrySet to short-circuit obviously-stale writes — not as a lock primitive. For coordination that must not split-brain, reach for etcd.
Local atomicity is also weak: the version read and the subsequent Set are not held under one lock. A concurrent local Set on the same key can land between the check and the write, in which case TrySet still issues its write (LWW-ordered against the concurrent one).
func (*Store[T]) Version ¶
Version returns the current Version for key. The Exists field is false when the key is absent or tombstoned.
func (*Store[T]) Watch ¶
func (s *Store[T]) Watch(ctx context.Context, opts ...WatchOption) <-chan Event[T]
Watch returns a channel that receives changes applied to the local replica, including changes authored by this node. Filter options narrow what's delivered. The channel is closed when ctx is cancelled or the Store is closed.
Slow consumers are dropped: if the watcher's buffer is full at dispatch time, the channel is closed and an error is reported through the configured error handler. Treat a closed channel as "fell behind — re-subscribe and call Refresh or Get."
Examples:
store.Watch(ctx)
store.Watch(ctx, podshare.WatchKey("alice"))
store.Watch(ctx, podshare.WatchPrefix("ws:"))
type StrictJSONCodec ¶
type StrictJSONCodec struct{}
StrictJSONCodec is like JSONCodec but rejects payloads containing fields that aren't present in T. Wire it up with WithCodec when you want schema drift to surface immediately as OnError calls instead of silent field loss during mixed-version deploys:
store, _ := podshare.New[Flag](ctx, "flags", tr,
podshare.WithCodec[Flag](podshare.StrictJSONCodec{}),
podshare.WithErrorHandler[Flag](func(e error) {
slog.Warn("podshare decode", "err", e)
}),
)
On a schema mismatch, the decode fails; handleRaw reports it via OnError and the message is skipped. State will diverge across versions while the mismatch persists — that's the point. Loud failure beats a silent dropped field.
Pair StrictJSONCodec with a two-phase deploy: ship a version that READS the new field (still in strict mode — it'll accept payloads without the field, because they have *no extra* field) before any pod starts WRITING the new field. The strict check fires on unknown fields, not missing ones.
type Transport ¶
type Transport interface {
Publish(ctx context.Context, topic string, msg []byte) error
// Subscribe returns a receive-only channel of inbound messages for
// topic. The channel is closed when the transport is closed. Calling
// Subscribe more than once for the same topic is implementation-
// defined; built-in transports return an error.
Subscribe(ctx context.Context, topic string) (<-chan []byte, error)
Snapshot(ctx context.Context, topic string, data []byte) error
// FetchSnapshot returns the latest persisted snapshot for topic, or
// (nil, nil) if no snapshot has been written yet.
FetchSnapshot(ctx context.Context, topic string) ([]byte, error)
Close() error
}
Transport carries wire messages between pods that share a topic.
Implementations are responsible for fan-out: every Subscribe channel associated with topic must receive every message that any pod publishes to topic. Self-delivery is allowed — Store filters its own messages by node ID — so implementations need not suppress echoes.
Snapshot persists the most recent full state for topic; FetchSnapshot returns it (or nil, nil if no snapshot exists). Snapshots let late- joining nodes catch up without replaying the entire event stream.
type Version ¶
Version identifies a specific revision of a key, useful for optimistic-concurrency Set/Delete operations via SetIf/DeleteIf.
Note: distributed CAS over an eventually-consistent transport is best-effort. Two pods can both observe Version V, both pass the local SetIf check, and both broadcast. LWW on (Timestamp, Origin, Seq) then chooses a winner. Use SetIf to guard against local stale reads and to short-circuit obviously-out-of-date writes — not as a true lock.
type WatchOption ¶
type WatchOption func(*watchConfig)
WatchOption configures a Watch call. Compose with the helpers: WatchFilter, WatchKey, WatchPrefix.
func WatchFilter ¶
func WatchFilter(fn func(key string) bool) WatchOption
WatchFilter delivers only events whose Key satisfies fn. Filtering is server-side — events that don't match are skipped without consuming the watcher's buffer.
func WatchKey ¶
func WatchKey(key string) WatchOption
WatchKey delivers only events for the named key.
func WatchPrefix ¶
func WatchPrefix(prefix string) WatchOption
WatchPrefix delivers only events whose key starts with prefix. Useful for grouping ("ws:", "feature:checkout:", etc.).
Directories
¶
| Path | Synopsis |
|---|---|
|
Package callreply is a small RPC layer that runs over any podshare.Transport.
|
Package callreply is a small RPC layer that runs over any podshare.Transport. |
|
examples
|
|
|
basic
command
Basic example: two Stores share state through an in-process MemoryTransport.
|
Basic example: two Stores share state through an in-process MemoryTransport. |
|
chat-cache
command
Chat-history hot cache.
|
Chat-history hot cache. |
|
feature-flags
command
Feature flags with live propagation.
|
Feature flags with live propagation. |
|
p2p
command
P2P mesh example.
|
P2P mesh example. |
|
redis
command
Redis-backed example.
|
Redis-backed example. |
|
web
command
Interactive web demo: 3 pods share state in one process, each exposing its own HTTP UI.
|
Interactive web demo: 3 pods share state in one process, each exposing its own HTTP UI. |
|
ws-routing
command
WebSocket routing across pods.
|
WebSocket routing across pods. |
|
Package prom exposes a podshare Store's Stats() as a prometheus.Collector.
|
Package prom exposes a podshare Store's Stats() as a prometheus.Collector. |
|
Package transport contains pluggable Transport implementations for podshare: Memory (testing), Redis (pub/sub), and P2P (TCP mesh).
|
Package transport contains pluggable Transport implementations for podshare: Memory (testing), Redis (pub/sub), and P2P (TCP mesh). |