dst

package
v0.6.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: May 5, 2026 License: MIT Imports: 18 Imported by: 0

Documentation

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

func AllBlobUUIDsSorted

func AllBlobUUIDsSorted(r *repo.Repo) ([]string, error)

AllBlobUUIDsSorted returns a sorted slice of UUIDs for deterministic comparison.

func CheckBlobIntegrity

func CheckBlobIntegrity(nodeID string, r *repo.Repo) error

CheckBlobIntegrity verifies that every blob in the repo has a UUID matching the hash of its expanded content. This catches corruption from buggify (content.Expand byte-flip) or storage bugs.

func CheckConvergence

func CheckConvergence(master *repo.Repo, leaves map[NodeID]*repo.Repo) error

CheckConvergence verifies that every leaf repo contains the same set of artifact UUIDs as the master repo.

func CheckDeltaChains

func CheckDeltaChains(nodeID string, r *repo.Repo) error

CheckDeltaChains verifies that every delta's srcid points to an existing blob (no dangling references).

func CheckNoOrphanPhantoms

func CheckNoOrphanPhantoms(nodeID string, r *repo.Repo) error

CheckNoOrphanPhantoms verifies that phantom entries reference blobs that actually exist in the blob table (they're just missing content).

func CheckSubsetOf

func CheckSubsetOf(master *repo.Repo, leaves map[NodeID]*repo.Repo) error

CheckSubsetOf verifies that all artifacts in the master are present in every leaf. Unlike CheckConvergence, this allows leaves to have extra artifacts (useful when only pull is being tested).

func CheckTableSyncConvergence

func CheckTableSyncConvergence(repos map[string]*repo.Repo) error

CheckTableSyncConvergence verifies that all repos have identical rows for every shared synced table. Tables present in some repos but not others are skipped (schema may still be propagating).

func CheckTableSyncIntegrity

func CheckTableSyncIntegrity(nodeID string, r *repo.Repo) error

CheckTableSyncIntegrity verifies that: 1. Every row in every synced table has a valid PK hash. 2. Every row's mtime is positive. 3. The computed catalog hash is 40 hex chars.

func CheckTagxrefIntegrity

func CheckTagxrefIntegrity(nodeID string, r *repo.Repo) error

CheckTagxrefIntegrity verifies that: 1. Every tagxref.rid references a valid blob 2. Every tagxref.tagid references a valid tag 3. Propagated entries (srcid=0) have tagtype=2 4. No tagxref.rid references a phantom blob

func CheckUVConvergence

func CheckUVConvergence(master *repo.Repo, leaves map[NodeID]*repo.Repo) error

CheckUVConvergence verifies that all repos have identical UV content hashes.

func CheckUVIntegrity

func CheckUVIntegrity(nodeID string, r *repo.Repo) error

CheckUVIntegrity verifies that every non-tombstone entry in the unversioned table has a hash matching the SHA1 of its decompressed content, and sz matches the content length.

func CountBlobs

func CountBlobs(r *repo.Repo) (int, error)

CountBlobs returns the number of non-phantom blobs in the repo.

func HasBlob

func HasBlob(r *repo.Repo, uuid string) bool

HasBlob checks if a specific artifact exists in the repo.

Types

type Action

type Action struct {
	Type   ActionType
	Result *sync.SyncResult // non-nil when Type == ActionSynced
	Err    error            // non-nil on sync failure
}

Action is the result of processing a single event via Tick.

type ActionType

type ActionType int

ActionType classifies the result of processing an event.

const (
	ActionNone   ActionType = iota
	ActionSynced            // a sync round was executed
)

type DefaultNode

type DefaultNode struct {
	// contains filtered or unexported fields
}

DefaultNode is a simple Node that calls sync.Sync directly. No NATS, no HTTP — just a repo + transport.

func NewDefaultNode

func NewDefaultNode(r *repo.Repo, t sync.Transport, projectCode, serverCode string, opts DefaultNodeOpts) *DefaultNode

NewDefaultNode creates a Node backed by sync.Sync.

func (*DefaultNode) Repo

func (n *DefaultNode) Repo() *repo.Repo

Repo returns the node's Fossil repository.

func (*DefaultNode) Tick

func (n *DefaultNode) Tick(ctx context.Context, event EventType) Action

Tick executes a sync cycle for the given event.

type DefaultNodeOpts

type DefaultNodeOpts struct {
	Push       bool
	Pull       bool
	UV         bool
	XTableSync bool
	Private    bool
	Buggify    sync.BuggifyChecker
}

DefaultNodeOpts configures a DefaultNode.

type Event

type Event struct {
	Time    time.Time
	Type    EventType
	NodeID  NodeID
	UVName  string // for EvUVWrite/EvUVDelete: file name
	UVData  []byte // for EvUVWrite: file content
	UVMTime int64  // for EvUVWrite/EvUVDelete: mtime
}

Event is a scheduled occurrence in the simulation.

type EventQueue

type EventQueue []*Event

EventQueue is a min-heap of events ordered by time.

func (EventQueue) Len

func (q EventQueue) Len() int

func (EventQueue) Less

func (q EventQueue) Less(i, j int) bool

func (*EventQueue) Pop

func (q *EventQueue) Pop() any

func (*EventQueue) PopEvent

func (q *EventQueue) PopEvent() *Event

PopEvent removes and returns the earliest event.

func (*EventQueue) Push

func (q *EventQueue) Push(x any)

func (*EventQueue) PushEvent

func (q *EventQueue) PushEvent(e *Event)

PushEvent adds an event to the queue.

func (EventQueue) Swap

func (q EventQueue) Swap(i, j int)

type EventType

type EventType int

EventType classifies simulation events.

const (
	EvTimer    EventType = iota // leaf poll timer fired
	EvSyncNow                   // manual sync trigger for a leaf
	EvUVWrite                   // write a UV file to a node's repo
	EvUVDelete                  // delete a UV file from a node's repo
)

type InvariantError

type InvariantError struct {
	Invariant string
	NodeID    string // "master" or leaf ID
	Detail    string
}

InvariantError records which invariant failed, on which node, and why.

func (*InvariantError) Error

func (e *InvariantError) Error() string

type MockFossil

type MockFossil struct {
	// contains filtered or unexported fields
}

MockFossil simulates a Fossil master server using the real HandleSync handler. It manages its own repo and implements sync.Transport by dispatching xfer messages to HandleSyncWithOpts.

func NewMockFossil

func NewMockFossil(r *repo.Repo) *MockFossil

NewMockFossil creates a MockFossil backed by the given repo.

func (*MockFossil) Exchange

func (f *MockFossil) Exchange(ctx context.Context, req *xfer.Message) (*xfer.Message, error)

Exchange handles one xfer request/response round by delegating to the real HandleSyncWithOpts. This ensures the DST tests exercise the same code path as production servers.

func (*MockFossil) Repo

func (f *MockFossil) Repo() *repo.Repo

Repo returns the mock fossil's repository (for seeding and invariants).

func (*MockFossil) SetBuggify

func (f *MockFossil) SetBuggify(b libsync.BuggifyChecker)

SetBuggify configures fault injection for the handler.

func (*MockFossil) StoreArtifact

func (f *MockFossil) StoreArtifact(data []byte) (string, error)

StoreArtifact adds a raw artifact to the mock fossil's repo. Returns the UUID. Used by tests to seed the master with content.

type Node

type Node interface {
	Tick(ctx context.Context, event EventType) Action
	Repo() *repo.Repo
}

Node represents a sync participant in the simulation. The simulator drives events by calling Tick; the Node executes a sync cycle and reports the result. EdgeSync can implement this by wrapping its agent.Agent; the DST provides a DefaultNode for in-module testing.

type NodeID

type NodeID string

NodeID identifies a node in the simulation.

type PeerNetwork

type PeerNetwork struct {
	// contains filtered or unexported fields
}

PeerNetwork simulates a peer-to-peer network where each leaf's Exchange calls HandleSync on the designated peer's repo. No bridge, no central server — pure leaf-to-leaf sync under DST control.

func NewPeerNetwork

func NewPeerNetwork(rng *rand.Rand) *PeerNetwork

NewPeerNetwork creates a simulated peer network.

func (*PeerNetwork) AddPeer

func (n *PeerNetwork) AddPeer(id NodeID, r *repo.Repo)

AddPeer registers a leaf's repo as a sync target for other leaves.

func (*PeerNetwork) Heal

func (n *PeerNetwork) Heal(id NodeID)

Heal removes a node from the partition set.

func (*PeerNetwork) HealAll

func (n *PeerNetwork) HealAll()

HealAll removes all partitions.

func (*PeerNetwork) Partition

func (n *PeerNetwork) Partition(id NodeID)

Partition isolates a node — all messages to/from it are dropped.

func (*PeerNetwork) SetBuggify

func (n *PeerNetwork) SetBuggify(b libsync.BuggifyChecker)

SetBuggify configures fault injection for the handler.

func (*PeerNetwork) SetDropRate

func (n *PeerNetwork) SetDropRate(rate float64)

SetDropRate sets the probability that any message is dropped.

func (*PeerNetwork) Transport

func (n *PeerNetwork) Transport(source, target NodeID) *PeerTransport

Transport returns a sync.Transport for the given source node that routes to a specific target peer's HandleSync.

type PeerTransport

type PeerTransport struct {
	// contains filtered or unexported fields
}

PeerTransport implements sync.Transport by routing through the PeerNetwork to a specific target peer.

func (*PeerTransport) Exchange

func (t *PeerTransport) Exchange(ctx context.Context, req *xfer.Message) (*xfer.Message, error)

Exchange sends a request through the peer network to the target's HandleSync.

type SeededBuggify

type SeededBuggify struct {
	// contains filtered or unexported fields
}

SeededBuggify implements sync.BuggifyChecker with a deterministic PRNG.

func (*SeededBuggify) Check

func (b *SeededBuggify) Check(_ string, probability float64) bool

Check returns true with the given probability, using the seeded PRNG.

type SimConfig

type SimConfig struct {
	Seed                int64
	NumLeaves           int
	PollInterval        time.Duration
	TmpDir              string            // directory for repo files
	Upstream            libsync.Transport // mock Fossil master
	Buggify             bool              // enable BUGGIFY fault injection
	UV                  bool              // sync unversioned files
	Private             bool              // sync private artifacts
	SafetyCheckInterval int               // run CheckSafety() every N steps; 0 = disabled
}

SimConfig configures a simulation run.

type SimNetwork

type SimNetwork struct {
	// contains filtered or unexported fields
}

SimNetwork simulates the network between leaf agents and the upstream transport (mock Fossil master). It supports message dropping, response truncation, and network partitions, controlled by a seeded PRNG for deterministic behavior.

In the original EdgeSync DST, messages routed through a Bridge that forwarded to the upstream. Since Bridge.HandleRequest was just upstream.Exchange, SimNetwork now calls the upstream directly.

func NewSimNetwork

func NewSimNetwork(rng *rand.Rand, upstream libsync.Transport) *SimNetwork

NewSimNetwork creates a simulated network connected to the given upstream transport.

func (*SimNetwork) Heal

func (n *SimNetwork) Heal(id NodeID)

Heal removes a node from the partition set.

func (*SimNetwork) HealAll

func (n *SimNetwork) HealAll()

HealAll removes all partitions.

func (*SimNetwork) Partition

func (n *SimNetwork) Partition(id NodeID)

Partition isolates a node — all messages to/from it are dropped.

func (*SimNetwork) SetDropRate

func (n *SimNetwork) SetDropRate(rate float64)

SetDropRate sets the probability that any message is dropped entirely.

func (*SimNetwork) SetTruncateRate

func (n *SimNetwork) SetTruncateRate(rate float64)

SetTruncateRate sets the probability that a response is truncated (random suffix of cards dropped). Simulates partial delivery.

func (*SimNetwork) Transport

func (n *SimNetwork) Transport(nodeID NodeID) *SimTransport

Transport returns a sync.Transport for the given node that routes messages through this simulated network to the upstream.

type SimTransport

type SimTransport struct {
	// contains filtered or unexported fields
}

SimTransport implements sync.Transport by routing through the SimNetwork.

func (*SimTransport) Exchange

func (t *SimTransport) Exchange(ctx context.Context, req *xfer.Message) (*xfer.Message, error)

Exchange sends a request through the simulated network to the upstream.

type Simulator

type Simulator struct {
	Seed int64

	// Stats
	Steps         int
	TotalSyncs    int
	TotalErrors   int
	TotalUVSent   int
	TotalUVRecvd  int
	TotalUVGimmes int
	// contains filtered or unexported fields
}

Simulator drives a deterministic simulation of multiple leaf nodes syncing through a simulated network to an upstream transport (mock Fossil master).

func New

func New(cfg SimConfig) (*Simulator, error)

New creates a Simulator with the given configuration. It creates leaf repos, nodes, and the simulated network. All I/O is local SQLite — no NATS or HTTP connections are made.

func (*Simulator) CheckAllConverged

func (s *Simulator) CheckAllConverged(master *repo.Repo) error

CheckAllConverged checks convergence between master and all leaves. Only meaningful after a fault-free sync period.

func (*Simulator) CheckAllTableSyncConverged

func (s *Simulator) CheckAllTableSyncConverged() error

CheckAllTableSyncConverged checks table sync convergence across all nodes.

func (*Simulator) CheckAllUVConverged

func (s *Simulator) CheckAllUVConverged(master *repo.Repo) error

CheckAllUVConverged checks UV convergence between master and all leaves.

func (*Simulator) CheckSafety

func (s *Simulator) CheckSafety() error

CheckSafety runs all safety invariants on every node in the simulation.

func (*Simulator) CheckTombstoneConvergence

func (s *Simulator) CheckTombstoneConvergence(t *testing.T, masterRepo *repo.Repo, tableName string, def repo.TableDef)

CheckTombstoneConvergence verifies that all nodes agree on which rows are tombstones for each synced table. Two nodes may have different catalog hashes during convergence, but once converged, their tombstone sets must match.

func (*Simulator) Clock

func (s *Simulator) Clock() *simio.SimClock

Clock returns the simulator's virtual clock.

func (*Simulator) Close

func (s *Simulator) Close() error

Close cleans up all leaf node repos and disables buggify. Iterates leafIDs (not map) for deterministic error reporting.

func (*Simulator) Leaf

func (s *Simulator) Leaf(id NodeID) Node

Leaf returns the node for the given node ID.

func (*Simulator) LeafIDs

func (s *Simulator) LeafIDs() []NodeID

LeafIDs returns the ordered list of leaf node IDs.

func (*Simulator) Network

func (s *Simulator) Network() *SimNetwork

Network returns the simulated network for fault injection.

func (*Simulator) Run

func (s *Simulator) Run(maxSteps int) error

Run processes up to maxSteps events. Returns nil on success or the first invariant/error encountered.

func (*Simulator) RunUntil

func (s *Simulator) RunUntil(deadline time.Time) error

RunUntil processes events until the clock reaches the given time.

func (*Simulator) ScheduleSyncNow

func (s *Simulator) ScheduleSyncNow(id NodeID)

ScheduleSyncNow injects a SyncNow event for the given leaf at the current time.

func (*Simulator) ScheduleUVDelete

func (s *Simulator) ScheduleUVDelete(id NodeID, at time.Time, name string, mtime int64)

ScheduleUVDelete injects a UV delete event for the given node at the specified time.

func (*Simulator) ScheduleUVWrite

func (s *Simulator) ScheduleUVWrite(id NodeID, at time.Time, name string, data []byte, mtime int64)

ScheduleUVWrite injects a UV write event for the given node at the specified time.

func (*Simulator) SetMasterRepo

func (s *Simulator) SetMasterRepo(r *repo.Repo)

SetMasterRepo registers the master repo for UV events targeting "master".

func (*Simulator) Step

func (s *Simulator) Step() (bool, error)

Step processes the next event in the queue. Returns false if the queue is empty.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL