podshare

package module
v0.1.2 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: May 28, 2026 License: MIT Imports: 12 Imported by: 0

README

podshare

Generic, type-safe cross-pod data sharing for Go. A Store[T] is a map[string]T that is replicated across every pod sharing a topic. Reads are served locally (zero network hop); writes broadcast over a pluggable transport.

type Config struct {
    RateLimit    int             `json:"rate_limit"`
    FeatureFlags map[string]bool `json:"feature_flags"`
}

tr, _ := transport.NewRedisTransport(transport.RedisOptions{Addr: "localhost:6379"})
defer tr.Close()

store, _ := podshare.New[Config](ctx, "config", tr)
defer store.Close()

store.Set(ctx, "global", Config{RateLimit: 1000})
cfg, _ := store.Get("global")           // local, ~16ns

for ev := range store.Watch(ctx) {
    fmt.Println(ev.Kind, ev.Key, ev.Origin)
}

Status

Pre-1.0. The wire format carries a version byte (ProtocolVersion) and breaking changes will bump it. The Go API may still change.

Transports

Transport When to use Semantics
transport.NewRedisTransport Redis is already in your stack At-most-once Pub/Sub; snapshot in a sibling key
transport.NewP2PTransport No external deps; in-cluster mesh TCP, optional mTLS via TLSConfig
transport.NewMemoryTransport Tests, single-process simulation In-process broker

API at a glance

// Reads — all O(local map lookup)
store.Get(key)
store.Keys()
store.Snapshot()
store.Version(key)              // for SetIf
store.Stats()                   // counters: writes, reads, watchers, snapshots

// Writes — apply locally, broadcast, debounce snapshot
store.Set(ctx, key, value)
store.Delete(ctx, key)
store.SetMany(ctx, kvs)         // atomic on local view
store.DeleteMany(ctx, keys)
store.TrySet(ctx, key, value, prev)  // best-effort versioned CAS (see godoc)
store.Seed(initial)             // hydrate at startup without broadcasting

// Watch — non-blocking dispatch, slow watchers are dropped
store.Watch(ctx)                                       // all events
store.Watch(ctx, podshare.WatchKey("alice"))           // one key
store.Watch(ctx, podshare.WatchPrefix("ws:"))          // a prefix
store.Watch(ctx, podshare.WatchFilter(myPredicate))    // arbitrary filter

// Recovery
store.Refresh(ctx)              // re-fetch snapshot, merge under LWW
store.Close()

Options

  • WithCodec / WithNodeID / WithWatchBuffer — basics
  • WithSnapshotInterval / WithGCInterval / WithTombstoneTTL — snapshot + compaction tuning
  • WithMerger — CRDT-style per-value merge (replaces LWW outcome for Set)
  • WithErrorHandler — surface decode/snapshot/dropped-watcher errors
  • WithLogger*slog.Logger for operational events

When to reach for podshare vs. something else

Good fits: feature flags, rate-limit counters, chat-history hot cache, presence registries, fleet-wide circuit breakers, small routing tables ("user X lives on pod-3").

Bad fits: durable source of truth, large datasets (every pod holds the full map), strong consistency (use etcd), cross-region replication.

For high write churn on the same key, LWW drops concurrent writes by design. WithMerger is safe only when your merge function is a true CRDT join (commutative, associative, idempotent) — peers each run the merger against their own local state. For ordered chat-message logs, model as a G-Set of {ID, Timestamp, Content} and sort on read; a list-append merger will diverge across pods.

Comparison with olric, NATS JetStream KV, etcd v3, memberlist, groupcache — see go doc github.com/thanhphuchuynh/podshare.

Subpackages

  • transport — Memory, Redis, and P2P transports
  • callreply — RPC layer for "call this method on whichever pod owns target X" (e.g., forwarding a push to the pod holding a user's WebSocket)
  • prom — Prometheus collector wrapping Stats(). Optional; only imports client_golang when you import this subpackage.
  • examples/basic, redis, p2p, chat-cache, feature-flags, ws-routing, web (interactive browser demo, 3 pods in one process)

Peer discovery (P2P transport)

The Peers slice you pass at construction is the initial set, not the full one. After construction, use AddPeer / RemovePeer (or the bundled DNSDiscoverer) to react to the fleet changing size — whether the change came from HPA, manual kubectl scale, a deploy roll, or hand-managed VMs.

The core primitives:

tr.AddPeer("10.0.0.5:9101")    // start dialing
tr.RemovePeer("10.0.0.5:9101") // cancel dial + close existing conn
tr.Peers()                     // []string of currently-configured peers

SyncPeers reconciles a discovered set against the transport's current peers on every tick — it adds new addresses and removes ones no longer present. Idempotent.

sync := transport.SyncPeers(tr)
sync([]string{"10.0.0.1:9101", "10.0.0.2:9101"})

Pick the discovery source that matches your platform:

Kubernetes — headless Service + DNS
apiVersion: v1
kind: Service
metadata:
  name: podshare-headless
spec:
  clusterIP: None        # headless: DNS returns all pod IPs
  selector: { app: my-app }
  ports: [{ port: 9101 }]
go (&transport.DNSDiscoverer{
    Host:     "podshare-headless.default.svc.cluster.local",
    Port:     9101,
    Self:     net.JoinHostPort(os.Getenv("POD_IP"), "9101"),
    Interval: 10 * time.Second,
    OnError:  func(e error) { slog.Warn("discovery", "err", e) },
}).Run(ctx, transport.SyncPeers(tr))

Works the same for HPA, manual kubectl scale, deploy rolls — kube-DNS updates A records the moment a pod becomes Ready, and the discoverer picks it up on its next tick.

Static list from env / config

For small fleets you scale by hand:

// PODSHARE_PEERS=10.0.0.1:9101,10.0.0.2:9101
for _, p := range strings.Split(os.Getenv("PODSHARE_PEERS"), ",") {
    tr.AddPeer(strings.TrimSpace(p))
}

To grow, redeploy with a new env var. AddPeer is idempotent, so re-applying the full list at startup is safe.

File watch

When the peer list lives in a shared file (Ansible-rendered, NFS, etc.):

go func() {
    sync := transport.SyncPeers(tr)
    for {
        peers := readLines("/etc/podshare/peers")
        sync(peers)
        time.Sleep(10 * time.Second)
    }
}()
Admin HTTP endpoint

For ops-driven scale (lock it behind auth in production):

mux.HandleFunc("POST /admin/peers", func(w http.ResponseWriter, r *http.Request) {
    tr.AddPeer(r.URL.Query().Get("addr"))
    w.WriteHeader(http.StatusNoContent)
})
mux.HandleFunc("DELETE /admin/peers", func(w http.ResponseWriter, r *http.Request) {
    tr.RemovePeer(r.URL.Query().Get("addr"))
    w.WriteHeader(http.StatusNoContent)
})
Centralized registry (etcd / Consul / Redis)

For larger or multi-tenant clusters, watch a key prefix and feed updates into the same SyncPeers callback. Any source that produces a list of addresses works — the transport's API is intentionally narrow.

Operational gotchas

  • Clock skew: LWW orders by wall-clock time. Run NTP/PTP.
  • Redis Pub/Sub is at-most-once: messages between drop and reconnect are lost. Call store.Refresh(ctx) after detecting reconnection.
  • P2P is plaintext by default: pass a TLSConfig with mTLS for authenticated meshes.
  • Watchers must drain: a full watcher channel gets dropped (channel closed) and reported via WithErrorHandler. Treat that as "fell behind — re-subscribe and re-snapshot."
  • Tombstones: keys remain as tombstones for WithTombstoneTTL (24h default) and are GC'd on the snapshot tick. Idle topics still compact via WithGCInterval (5m default).

Run tests / benchmarks / fuzz

go test -race ./...
go test -bench=. -benchtime=1s ./...
go test -fuzz=FuzzReadFrame -fuzztime=30s -run=^$ ./transport

Documentation

Overview

Package podshare is a generic, type-safe cross-pod data store.

A Store[T] holds a strongly-typed map[string]T that is replicated across every pod sharing the same topic. Reads are served from a local in-memory cache (zero network hop). Writes are applied locally first, then broadcast over a pluggable Transport so peers converge on the same state.

Conflict resolution is last-write-wins ordered by (Timestamp, Origin, Seq). For richer semantics (CRDT, vector clocks) layer merge logic on top of the Watch channel.

Two transports ship with the module: a Redis pub/sub transport in transport/redis.go for shops that already run Redis, and a P2P TCP transport in transport/p2p.go for dependency-free meshes. A Memory transport in transport/memory.go is provided for tests.

When to use podshare

Good fits — small mutable state that every pod needs the same view of:

  • feature flags and config
  • rate-limit counters and quotas
  • chat-history hot cache
  • presence / online-user registries
  • fleet-wide circuit breakers and kill switches
  • small routing tables ("user X lives on pod-3")

Bad fits — reach for something else:

  • source of truth for unloseable data (no durability guarantee)
  • large datasets (every pod holds the full map)
  • high write churn on the same key (LWW drops concurrent updates)
  • strong consistency, locks, or leader election (use etcd)
  • cross-region replication (designed for in-cluster fan-out)

Trade-offs vs alternatives

olric — embedded distributed KV with sharding and replicas. Pick when the dataset is too large to replicate on every pod and you want it embedded with no extra service.

NATS JetStream KV — keyed bucket with watch, history, and durability. Runs against a NATS cluster (not embedded). The "boring production" answer when state must survive pod restarts.

etcd v3 — linearizable KV with watch. Overkill for shared cache; the right pick for locks, leader election, and config that must not split-brain.

hashicorp/memberlist — gossip primitive. Use as a building block if you need gossip semantics (anti-entropy, failure detection) and you're willing to write the typed map and LWW logic yourself.

groupcache — peer cache with consistent hashing. Different model: each key lives on one peer, not full replication. Cannot substitute for podshare's "every pod has the full view."

Operational notes

LWW orders by wall-clock time. Run NTP or PTP — sustained skew between pods will reorder writes.

Snapshot persistence is asynchronous and debounced (default 200ms, see WithSnapshotInterval). Bursts coalesce into a single snapshot; Close flushes any pending state.

Tombstones are kept for WithTombstoneTTL (default 24h) so concurrent late writes from peers cannot resurrect deleted keys. Expired tombstones are GCed at snapshot time, so an idle topic will not compact until the next write.

Watch uses non-blocking dispatch. A consumer whose buffer fills is dropped (channel closed) and reported via WithErrorHandler; treat a closed Watch channel as "fell behind — re-subscribe and re-snapshot via Get."

Each Transport documents its own semantics. RedisTransport is at-most-once (Redis Pub/Sub does not replay missed messages); P2PTransport is plaintext unless you supply a TLSConfig with mTLS for authenticated meshes.

P2P peer discovery

P2PTransport's Peers list at construction is just the initial set. To react to fleet changes — Kubernetes HPA scaling, manual kubectl scale, deploy rolls, hand-managed VMs — use AddPeer / RemovePeer from a discovery loop. The bundled transport.DNSDiscoverer covers the headless-Service-plus-DNS pattern in Kubernetes; for other sources (env list, file watch, admin HTTP, etcd/Consul) wire your own loop to transport.SyncPeers and the transport reconciles.

Index

Constants

View Source
const ProtocolVersion uint8 = 1

ProtocolVersion identifies the wire format. Bump when a breaking change is made. Receivers reject messages whose V is higher than this constant; lower versions are accepted with default values for any fields the sender didn't populate.

Variables

View Source
var ErrBroadcast = errors.New("podshare: broadcast failed; local state updated")

ErrBroadcast wraps transport publish failures from Set/Delete/SetMany. When errors.Is(err, ErrBroadcast) is true, the local replica has already applied the write — only the fan-out to peers failed. Callers should decide retry semantics based on this distinction: retrying a successful local write produces a write with a fresh timestamp/seq, which is safe but not idempotent.

View Source
var ErrClosed = errors.New("podshare: store closed")

ErrClosed is returned by methods invoked after Close.

View Source
var ErrSeedNonEmpty = errors.New("podshare: Seed called on non-empty store")

ErrSeedNonEmpty is returned by Seed when the store already holds data. Seed is meant to populate a fresh Store before any Set/Watch.

Functions

This section is empty.

Types

type Codec

type Codec interface {
	Marshal(v any) ([]byte, error)
	Unmarshal(b []byte, v any) error
}

Codec encodes and decodes the per-key value type T to and from bytes before it crosses a Transport boundary or hits a snapshot. The default is JSONCodec; swap it via WithCodec for faster wire formats such as msgpack or protobuf.

Codec methods must be safe for concurrent use.

type Event

type Event[T any] struct {
	Kind      EventKind
	Key       string
	Value     T
	Origin    string
	Timestamp time.Time
}

Event is a single change applied to the local replica. Origin is the node ID that authored the change, including this node's own ID for changes produced locally.

type EventKind

type EventKind int

EventKind identifies the shape of a change event.

const (
	EventSet EventKind = iota
	EventDelete
)

func (EventKind) String

func (k EventKind) String() string

type JSONCodec

type JSONCodec struct{}

JSONCodec is the default Codec — encoding/json. Marshals values to JSON, unmarshal is permissive (unknown fields in the wire payload are silently dropped).

The permissive behavior is fine for additive schema changes — old pods read new payloads without crashing — but it hides field-drop during rolling deploys with mixed versions of T. See StrictJSONCodec for the loud-failure variant.

func (JSONCodec) Marshal

func (JSONCodec) Marshal(v any) ([]byte, error)

func (JSONCodec) Unmarshal

func (JSONCodec) Unmarshal(b []byte, v any) error

type Option

type Option[T any] interface {
	// contains filtered or unexported methods
}

Option configures a Store at construction. Pass options to New.

func WithCodec

func WithCodec[T any](c Codec) Option[T]

WithCodec replaces the default JSON codec.

func WithErrorHandler

func WithErrorHandler[T any](fn func(error)) Option[T]

WithErrorHandler installs a callback that receives non-fatal errors: undecodable peer messages, snapshot failures, and dropped slow watchers. The callback runs on Store-internal goroutines and must be cheap and non-blocking. Default discards errors silently.

func WithGCInterval

func WithGCInterval[T any](d time.Duration) Option[T]

WithGCInterval sets how often the snapshotter wakes up to compact expired tombstones even when no writes are happening. Default 5m; set to 0 to GC only on writes.

func WithLogger

func WithLogger[T any](l *slog.Logger) Option[T]

WithLogger installs a slog.Logger for non-error operational events: snapshot writes, tombstone GC, watcher lifecycle. Errors still flow through WithErrorHandler. Default nil (no logging).

func WithMaxClockSkew

func WithMaxClockSkew[T any](d time.Duration) Option[T]

WithMaxClockSkew sets the threshold above which an incoming peer message's timestamp triggers an OnError report. Default 1 minute. Set to 0 to disable the check.

LWW depends on wall-clock time, so sustained skew silently reorders writes; this option turns the issue into an observable signal instead of a mystery. Skewed messages are still applied — replication continues, the operator just learns something is wrong.

func WithMerger

func WithMerger[T any](fn func(existing, incoming T) T) Option[T]

WithMerger installs a per-value merge function. When set, every applied Set merges the incoming value with the existing value (if any non-tombstoned value is present); the result is stored under the incoming write's timestamp/seq.

CRDT REQUIREMENTS — read carefully.

The wire format carries the unmerged incoming value, not the post-merge result. Each peer runs the merger against its own local state. For replicas to converge, the merger MUST form a join semilattice over T:

commutative:   fn(a, b) == fn(b, a)
associative:   fn(fn(a, b), c) == fn(a, fn(b, c))
idempotent:    fn(a, a) == a

Good fits: G-Set unions, OR-Set merges, monotonic counters, "max" over a numeric field, last-writer-wins per nested field where the nested timestamps converge under the same rules.

Bad fits — replicas WILL diverge: list append (order depends on arrival), "newest by client-supplied timestamp" (skew-sensitive), any operation that needs the merged history of inputs.

For an ordered chat-message log, model the value as a G-Set of {ID, Timestamp, Content}, deduplicate by ID, and sort on read; do NOT use a list-append merger.

Locking: the merger runs while the Store mutex is held. It must not call back into the Store (Get/Set/Watch/Refresh/Close) — doing so will deadlock. Keep it pure and fast.

func WithNodeID

func WithNodeID[T any](id string) Option[T]

WithNodeID overrides the auto-generated node identifier. Node IDs must be unique per process; collisions break last-write-wins tiebreaking.

func WithSnapshotInterval

func WithSnapshotInterval[T any](d time.Duration) Option[T]

WithSnapshotInterval sets the debounce window between snapshot writes. Bursty writes inside the window collapse into a single Transport.Snapshot call. Default 200ms. Lower values shorten the catch-up gap for new joiners but raise transport pressure.

func WithTombstoneTTL

func WithTombstoneTTL[T any](d time.Duration) Option[T]

WithTombstoneTTL sets how long deleted keys are retained as tombstones. During this window, late-arriving writes for a deleted key are still rejected by LWW. After it, tombstones are GCed at the next snapshot. Default 24h.

func WithWatchBuffer

func WithWatchBuffer[T any](n int) Option[T]

WithWatchBuffer sets the capacity of channels returned by Watch. A watcher whose buffer fills at dispatch time is dropped (channel closed); size this to comfortably absorb your peak event burst.

type Stats

type Stats struct {
	WritesLocal      uint64
	WritesApplied    uint64
	WritesRejected   uint64
	Reads            uint64
	EventsDispatched uint64
	WatchersActive   int
	WatchersDropped  uint64
	Snapshots        uint64
	SnapshotBytes    int64
	Keys             int
	Tombstones       int
	TombstonesGCed   uint64
}

Stats is a point-in-time snapshot of internal counters. Cheap; safe to call from a hot path.

type Store

type Store[T any] struct {
	// contains filtered or unexported fields
}

Store is the type-safe replicated map. Construct with New, release with Close. All methods are safe for concurrent use.

Conflict resolution is last-write-wins ordered by (Timestamp, Origin, Seq). Wall-clock skew between pods will therefore reorder writes — run NTP/PTP in production. The third tiebreaker, Seq, is a per-node monotonic counter that prevents same-instant same-node writes from being silently dropped.

func New

func New[T any](ctx context.Context, topic string, t Transport, opts ...Option[T]) (*Store[T], error)

New constructs a Store, joins topic, and pulls the latest snapshot from peers. The returned Store is ready for reads and writes immediately; callers should defer Close to release Store-local goroutines. The underlying Transport is owned by the caller — close it after every Store using it has been closed.

func (*Store[T]) Close

func (s *Store[T]) Close() error

Close stops background work and closes every Watch channel. The underlying Transport is not closed — callers own its lifetime. Safe to call more than once.

Close blocks until background goroutines exit, which includes a final snapshot flush via the configured Transport. If the transport may be stuck, use CloseWithContext to bound the wait.

func (*Store[T]) CloseWithContext

func (s *Store[T]) CloseWithContext(ctx context.Context) error

CloseWithContext is like Close but stops waiting once ctx is done. Returns ctx.Err() if shutdown didn't complete in time; the store is still marked closed (writes return ErrClosed) and watcher channels are closed, but the snapshot loop may not have flushed.

func (*Store[T]) Delete

func (s *Store[T]) Delete(ctx context.Context, key string) error

Delete tombstones key locally and broadcasts the deletion. Subsequent Get calls return the zero value with ok=false. Tombstones are kept for TombstoneTTL so concurrent late writes from peers cannot resurrect a deleted key.

func (*Store[T]) DeleteMany

func (s *Store[T]) DeleteMany(ctx context.Context, keys []string) error

DeleteMany tombstones keys atomically. Same atomicity guarantee as SetMany — readers see all-or-none on the local view.

func (*Store[T]) Get

func (s *Store[T]) Get(key string) (T, bool)

Get returns the value stored under key. The boolean is false when the key is absent or has been tombstoned.

func (*Store[T]) Keys

func (s *Store[T]) Keys() []string

Keys returns the live (non-tombstoned) keys. Order is undefined.

func (*Store[T]) NodeID

func (s *Store[T]) NodeID() string

NodeID returns this Store's identifier — useful for filtering own-origin events from a Watch channel.

func (*Store[T]) Refresh

func (s *Store[T]) Refresh(ctx context.Context) error

Refresh re-fetches the snapshot from the transport and merges it into local state under LWW. Use this after detecting a transport reconnect (P2P drop/restore, Redis failover) to recover events that may have been missed during the disconnection window.

Adopted entries fire Watch events so consumers learn about catch-up state through the same channel as live events.

func (*Store[T]) Seed

func (s *Store[T]) Seed(initial map[string]T) error

Seed loads initial state without broadcasting. Use it during pod startup to hydrate from a database before announcing presence. Entries are inserted with timestamp now() and this node's origin/seq so peers converge through normal LWW; Seed does NOT publish messages or run watchers. Snapshot persistence is triggered once via markDirty.

Seed must be called on an empty Store, before any Set or Watch. It returns ErrSeedNonEmpty otherwise — silently overwriting after the store has merged peer state is a footgun we'd rather make loud.

func (*Store[T]) Set

func (s *Store[T]) Set(ctx context.Context, key string, value T) error

Set writes value under key and broadcasts the change to peers. The write is applied locally first; the broadcast is best-effort against the transport. A returned error means the broadcast failed — the local write has already taken effect. Snapshot persistence is asynchronous: it is debounced through the snapshotter goroutine and flushed on Close.

func (*Store[T]) SetMany

func (s *Store[T]) SetMany(ctx context.Context, kvs map[string]T) error

SetMany applies kvs atomically to the local view (other readers see either the pre-batch or post-batch state, never partial). Each entry is broadcast individually; a transport-level failure on any item is returned, but earlier items have already been applied locally and (possibly) published.

func (*Store[T]) Snapshot

func (s *Store[T]) Snapshot() map[string]T

Snapshot returns a copy of the live key-value pairs.

func (*Store[T]) Stats

func (s *Store[T]) Stats() Stats

Stats returns a point-in-time snapshot of internal counters. O(1) — all counts are maintained incrementally as writes apply, so calling Stats in a tight loop (e.g. a metrics exporter) is cheap.

func (*Store[T]) Topic

func (s *Store[T]) Topic() string

Topic returns the topic this Store is bound to.

func (*Store[T]) TrySet

func (s *Store[T]) TrySet(ctx context.Context, key string, value T, prev Version) (bool, error)

TrySet writes value under key only if the current local Version matches prev. Returns (true, nil) on success, (false, nil) if the precondition failed, or an error for transport problems.

IMPORTANT — this is NOT a distributed compare-and-swap. Two pods can both observe Version V, both pass their local TrySet check, and both broadcast; LWW then picks one winner. Use TrySet to short-circuit obviously-stale writes — not as a lock primitive. For coordination that must not split-brain, reach for etcd.

Local atomicity is also weak: the version read and the subsequent Set are not held under one lock. A concurrent local Set on the same key can land between the check and the write, in which case TrySet still issues its write (LWW-ordered against the concurrent one).

func (*Store[T]) Version

func (s *Store[T]) Version(key string) Version

Version returns the current Version for key. The Exists field is false when the key is absent or tombstoned.

func (*Store[T]) Watch

func (s *Store[T]) Watch(ctx context.Context, opts ...WatchOption) <-chan Event[T]

Watch returns a channel that receives changes applied to the local replica, including changes authored by this node. Filter options narrow what's delivered. The channel is closed when ctx is cancelled or the Store is closed.

Slow consumers are dropped: if the watcher's buffer is full at dispatch time, the channel is closed and an error is reported through the configured error handler. Treat a closed channel as "fell behind — re-subscribe and call Refresh or Get."

Examples:

store.Watch(ctx)
store.Watch(ctx, podshare.WatchKey("alice"))
store.Watch(ctx, podshare.WatchPrefix("ws:"))

type StrictJSONCodec

type StrictJSONCodec struct{}

StrictJSONCodec is like JSONCodec but rejects payloads containing fields that aren't present in T. Wire it up with WithCodec when you want schema drift to surface immediately as OnError calls instead of silent field loss during mixed-version deploys:

store, _ := podshare.New[Flag](ctx, "flags", tr,
    podshare.WithCodec[Flag](podshare.StrictJSONCodec{}),
    podshare.WithErrorHandler[Flag](func(e error) {
        slog.Warn("podshare decode", "err", e)
    }),
)

On a schema mismatch, the decode fails; handleRaw reports it via OnError and the message is skipped. State will diverge across versions while the mismatch persists — that's the point. Loud failure beats a silent dropped field.

Pair StrictJSONCodec with a two-phase deploy: ship a version that READS the new field (still in strict mode — it'll accept payloads without the field, because they have *no extra* field) before any pod starts WRITING the new field. The strict check fires on unknown fields, not missing ones.

func (StrictJSONCodec) Marshal

func (StrictJSONCodec) Marshal(v any) ([]byte, error)

func (StrictJSONCodec) Unmarshal

func (StrictJSONCodec) Unmarshal(b []byte, v any) error

type Transport

type Transport interface {
	Publish(ctx context.Context, topic string, msg []byte) error

	// Subscribe returns a receive-only channel of inbound messages for
	// topic. The channel is closed when the transport is closed. Calling
	// Subscribe more than once for the same topic is implementation-
	// defined; built-in transports return an error.
	Subscribe(ctx context.Context, topic string) (<-chan []byte, error)

	Snapshot(ctx context.Context, topic string, data []byte) error

	// FetchSnapshot returns the latest persisted snapshot for topic, or
	// (nil, nil) if no snapshot has been written yet.
	FetchSnapshot(ctx context.Context, topic string) ([]byte, error)

	Close() error
}

Transport carries wire messages between pods that share a topic.

Implementations are responsible for fan-out: every Subscribe channel associated with topic must receive every message that any pod publishes to topic. Self-delivery is allowed — Store filters its own messages by node ID — so implementations need not suppress echoes.

Snapshot persists the most recent full state for topic; FetchSnapshot returns it (or nil, nil if no snapshot exists). Snapshots let late- joining nodes catch up without replaying the entire event stream.

type Version

type Version struct {
	Timestamp time.Time
	Origin    string
	Seq       uint64
	Exists    bool
}

Version identifies a specific revision of a key, useful for optimistic-concurrency Set/Delete operations via SetIf/DeleteIf.

Note: distributed CAS over an eventually-consistent transport is best-effort. Two pods can both observe Version V, both pass the local SetIf check, and both broadcast. LWW on (Timestamp, Origin, Seq) then chooses a winner. Use SetIf to guard against local stale reads and to short-circuit obviously-out-of-date writes — not as a true lock.

type WatchOption

type WatchOption func(*watchConfig)

WatchOption configures a Watch call. Compose with the helpers: WatchFilter, WatchKey, WatchPrefix.

func WatchFilter

func WatchFilter(fn func(key string) bool) WatchOption

WatchFilter delivers only events whose Key satisfies fn. Filtering is server-side — events that don't match are skipped without consuming the watcher's buffer.

func WatchKey

func WatchKey(key string) WatchOption

WatchKey delivers only events for the named key.

func WatchPrefix

func WatchPrefix(prefix string) WatchOption

WatchPrefix delivers only events whose key starts with prefix. Useful for grouping ("ws:", "feature:checkout:", etc.).

Directories

Path Synopsis
Package callreply is a small RPC layer that runs over any podshare.Transport.
Package callreply is a small RPC layer that runs over any podshare.Transport.
examples
basic command
Basic example: two Stores share state through an in-process MemoryTransport.
Basic example: two Stores share state through an in-process MemoryTransport.
chat-cache command
Chat-history hot cache.
Chat-history hot cache.
feature-flags command
Feature flags with live propagation.
Feature flags with live propagation.
p2p command
P2P mesh example.
P2P mesh example.
redis command
Redis-backed example.
Redis-backed example.
web command
Interactive web demo: 3 pods share state in one process, each exposing its own HTTP UI.
Interactive web demo: 3 pods share state in one process, each exposing its own HTTP UI.
ws-routing command
WebSocket routing across pods.
WebSocket routing across pods.
Package prom exposes a podshare Store's Stats() as a prometheus.Collector.
Package prom exposes a podshare Store's Stats() as a prometheus.Collector.
Package transport contains pluggable Transport implementations for podshare: Memory (testing), Redis (pub/sub), and P2P (TCP mesh).
Package transport contains pluggable Transport implementations for podshare: Memory (testing), Redis (pub/sub), and P2P (TCP mesh).

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL