Documentation
¶
Overview ¶
Package state implements the persistence layer: SQLite repos, StateEngine, dirty-set flush, consistency repair, and bootstrap.
Index ¶
- Variables
- func InitDB(db *sql.DB, ddl string) error
- func MigrateCacheDB(db *sql.DB) error
- func MigrateStateDB(db *sql.DB) error
- func OpenDB(path string) (*sql.DB, error)
- func RepairConsistency(stateDBPath string, cacheDB *sql.DB) error
- type CacheFlushWorker
- type CacheReaders
- type CacheRepo
- func (r *CacheRepo) BulkDeleteLeases(keys []model.LeaseKey) error
- func (r *CacheRepo) BulkDeleteNodeLatency(keys []model.NodeLatencyKey) error
- func (r *CacheRepo) BulkDeleteNodesDynamic(hashes []string) error
- func (r *CacheRepo) BulkDeleteNodesStatic(hashes []string) error
- func (r *CacheRepo) BulkDeleteSubscriptionNodes(keys []model.SubscriptionNodeKey) error
- func (r *CacheRepo) BulkUpsertLeases(leases []model.Lease) error
- func (r *CacheRepo) BulkUpsertNodeLatency(entries []model.NodeLatency) error
- func (r *CacheRepo) BulkUpsertNodesDynamic(nodes []model.NodeDynamic) error
- func (r *CacheRepo) BulkUpsertNodesStatic(nodes []model.NodeStatic) error
- func (r *CacheRepo) BulkUpsertSubscriptionNodes(nodes []model.SubscriptionNode) error
- func (r *CacheRepo) FlushTx(ops FlushOps) error
- func (r *CacheRepo) LoadAllLeases() ([]model.Lease, error)
- func (r *CacheRepo) LoadAllNodeLatency() ([]model.NodeLatency, error)
- func (r *CacheRepo) LoadAllNodesDynamic() ([]model.NodeDynamic, error)
- func (r *CacheRepo) LoadAllNodesStatic() ([]model.NodeStatic, error)
- func (r *CacheRepo) LoadAllSubscriptionNodes() ([]model.SubscriptionNode, error)
- type DirtyOp
- type DirtySet
- type FlushOps
- type LeaseDirtyKey
- type NodeLatencyDirtyKey
- type StateEngine
- func (e *StateEngine) DirtyCount() int
- func (e *StateEngine) FlushDirtySets(readers CacheReaders) error
- func (e *StateEngine) MarkLease(platformID, account string)
- func (e *StateEngine) MarkLeaseDelete(platformID, account string)
- func (e *StateEngine) MarkNodeDynamic(hash string)
- func (e *StateEngine) MarkNodeDynamicDelete(hash string)
- func (e *StateEngine) MarkNodeLatency(nodeHash, domain string)
- func (e *StateEngine) MarkNodeLatencyDelete(nodeHash, domain string)
- func (e *StateEngine) MarkNodeStatic(hash string)
- func (e *StateEngine) MarkNodeStaticDelete(hash string)
- func (e *StateEngine) MarkSubscriptionNode(subID, nodeHash string)
- func (e *StateEngine) MarkSubscriptionNodeDelete(subID, nodeHash string)
- type StateRepo
- func (r *StateRepo) DeleteAccountHeaderRule(prefix string) error
- func (r *StateRepo) DeletePlatform(id string) error
- func (r *StateRepo) DeleteSubscription(id string) error
- func (r *StateRepo) EnsureAccountHeaderRule(rule model.AccountHeaderRule) (bool, error)
- func (r *StateRepo) GetPlatform(id string) (*model.Platform, error)
- func (r *StateRepo) GetPlatformName(id string) (string, error)
- func (r *StateRepo) GetSystemConfig() (*config.RuntimeConfig, int, error)
- func (r *StateRepo) ListAccountHeaderRules() ([]model.AccountHeaderRule, error)
- func (r *StateRepo) ListPlatforms() ([]model.Platform, error)
- func (r *StateRepo) ListSubscriptions() ([]model.Subscription, error)
- func (r *StateRepo) SaveSystemConfig(cfg *config.RuntimeConfig, version int, updatedAtNs int64) error
- func (r *StateRepo) UpsertAccountHeaderRuleWithCreated(rule model.AccountHeaderRule) (bool, error)
- func (r *StateRepo) UpsertPlatform(p model.Platform) error
- func (r *StateRepo) UpsertSubscription(s model.Subscription) error
- type SubscriptionNodeDirtyKey
Constants ¶
This section is empty.
Variables ¶
var ErrConflict = errors.New("conflict")
ErrConflict is returned when a write violates a uniqueness/conflict constraint.
var ErrNotFound = errors.New("not found")
ErrNotFound is returned when a requested resource does not exist in the database.
Functions ¶
func InitDB ¶
InitDB executes raw DDL statements on the given database. Used by non-state SQLite stores (for example metrics and request logs).
func MigrateCacheDB ¶
MigrateCacheDB applies cache.db migrations.
func MigrateStateDB ¶
MigrateStateDB applies state.db migrations.
func OpenDB ¶
OpenDB opens (or creates) a SQLite database at path with recommended pragmas: WAL journal mode, synchronous=NORMAL, foreign_keys=ON, busy_timeout=5000.
func RepairConsistency ¶
RepairConsistency runs orphan-cleanup SQL on cache.db, cross-referencing state.db via ATTACH. All DELETEs execute in a single transaction to avoid half-repaired state on crash.
Cleanup order (by dependency):
- subscription_nodes: remove entries whose subscription_id is missing from state.subscriptions OR (for non-evicted rows) whose node_hash is missing from nodes_static.
- nodes_static: remove entries with no remaining non-evicted reference in subscription_nodes.
- nodes_dynamic: remove entries whose hash is missing from nodes_static.
- node_latency: remove entries whose node_hash is missing from nodes_static.
- leases: remove entries whose platform_id is missing from state.platforms OR whose node_hash is missing from nodes_static.
Types ¶
type CacheFlushWorker ¶
type CacheFlushWorker struct {
// contains filtered or unexported fields
}
CacheFlushWorker periodically flushes dirty sets to cache.db. It triggers a flush when:
- DirtyCount() >= threshold, OR
- time.Since(lastFlush) >= interval (and dirty count > 0)
On Stop(), a final flush is performed before returning.
func NewCacheFlushWorker ¶
func NewCacheFlushWorker( engine *StateEngine, readers CacheReaders, thresholdFn func() int, intervalFn func() time.Duration, checkTick time.Duration, ) *CacheFlushWorker
NewCacheFlushWorker creates a flush worker that pulls threshold/interval from callbacks on each check cycle. checkTick controls how often flush conditions are evaluated (e.g. 5s).
func (*CacheFlushWorker) Start ¶
func (w *CacheFlushWorker) Start()
Start launches the background flush goroutine.
func (*CacheFlushWorker) Stop ¶
func (w *CacheFlushWorker) Stop()
Stop signals the worker to stop and performs a final flush. Blocks until the goroutine exits.
type CacheReaders ¶
type CacheReaders struct {
ReadNodeStatic func(hash string) *model.NodeStatic
ReadNodeDynamic func(hash string) *model.NodeDynamic
ReadNodeLatency func(key NodeLatencyDirtyKey) *model.NodeLatency
ReadLease func(key LeaseDirtyKey) *model.Lease
ReadSubscriptionNode func(key SubscriptionNodeDirtyKey) *model.SubscriptionNode
}
CacheReaders provides callbacks for reading current in-memory values at flush time. If a reader returns nil for a key marked OpUpsert, the key is treated as a delete (the object was removed between mark and flush).
type CacheRepo ¶
type CacheRepo struct {
// contains filtered or unexported fields
}
CacheRepo wraps cache.db and provides batch read/write for weak-persist data.
func (*CacheRepo) BulkDeleteLeases ¶
BulkDeleteLeases batch-deletes lease records by composite key.
func (*CacheRepo) BulkDeleteNodeLatency ¶
func (r *CacheRepo) BulkDeleteNodeLatency(keys []model.NodeLatencyKey) error
BulkDeleteNodeLatency batch-deletes node latency records by composite key.
func (*CacheRepo) BulkDeleteNodesDynamic ¶
BulkDeleteNodesDynamic batch-deletes node dynamic records by hash.
func (*CacheRepo) BulkDeleteNodesStatic ¶
BulkDeleteNodesStatic batch-deletes node static records by hash.
func (*CacheRepo) BulkDeleteSubscriptionNodes ¶
func (r *CacheRepo) BulkDeleteSubscriptionNodes(keys []model.SubscriptionNodeKey) error
BulkDeleteSubscriptionNodes batch-deletes subscription-node links by composite key.
func (*CacheRepo) BulkUpsertLeases ¶
BulkUpsertLeases batch-inserts or updates lease records.
func (*CacheRepo) BulkUpsertNodeLatency ¶
func (r *CacheRepo) BulkUpsertNodeLatency(entries []model.NodeLatency) error
BulkUpsertNodeLatency batch-inserts or updates node latency records.
func (*CacheRepo) BulkUpsertNodesDynamic ¶
func (r *CacheRepo) BulkUpsertNodesDynamic(nodes []model.NodeDynamic) error
BulkUpsertNodesDynamic batch-inserts or updates node dynamic records.
func (*CacheRepo) BulkUpsertNodesStatic ¶
func (r *CacheRepo) BulkUpsertNodesStatic(nodes []model.NodeStatic) error
BulkUpsertNodesStatic batch-inserts or updates node static records.
func (*CacheRepo) BulkUpsertSubscriptionNodes ¶
func (r *CacheRepo) BulkUpsertSubscriptionNodes(nodes []model.SubscriptionNode) error
BulkUpsertSubscriptionNodes batch-inserts or updates subscription-node links.
func (*CacheRepo) FlushTx ¶
FlushTx executes all upserts and deletes in a single transaction.
Upsert order: nodes_static → subscription_nodes → nodes_dynamic → node_latency → leases Delete order: leases → node_latency → nodes_dynamic → subscription_nodes → nodes_static
func (*CacheRepo) LoadAllLeases ¶
LoadAllLeases reads all lease records.
func (*CacheRepo) LoadAllNodeLatency ¶
func (r *CacheRepo) LoadAllNodeLatency() ([]model.NodeLatency, error)
LoadAllNodeLatency reads all node latency records.
func (*CacheRepo) LoadAllNodesDynamic ¶
func (r *CacheRepo) LoadAllNodesDynamic() ([]model.NodeDynamic, error)
LoadAllNodesDynamic reads all node dynamic records.
func (*CacheRepo) LoadAllNodesStatic ¶
func (r *CacheRepo) LoadAllNodesStatic() ([]model.NodeStatic, error)
LoadAllNodesStatic reads all node static records.
func (*CacheRepo) LoadAllSubscriptionNodes ¶
func (r *CacheRepo) LoadAllSubscriptionNodes() ([]model.SubscriptionNode, error)
LoadAllSubscriptionNodes reads all subscription-node links.
type DirtySet ¶
type DirtySet[K comparable] struct { // contains filtered or unexported fields }
DirtySet tracks dirty keys with their operation type. It stores only keys — values are read from memory at flush time. Thread-safe via mutex; drain uses map-swap for a stable snapshot.
func NewDirtySet ¶
func NewDirtySet[K comparable]() *DirtySet[K]
NewDirtySet creates an empty DirtySet.
func (*DirtySet[K]) Drain ¶
Drain atomically swaps the internal map with a fresh one and returns the old map as a stable snapshot. Concurrent marks after Drain go into the new map.
func (*DirtySet[K]) MarkDelete ¶
func (d *DirtySet[K]) MarkDelete(key K)
MarkDelete marks a key for deletion.
func (*DirtySet[K]) MarkUpsert ¶
func (d *DirtySet[K]) MarkUpsert(key K)
MarkUpsert marks a key for upsert.
type FlushOps ¶
type FlushOps struct {
UpsertNodesStatic []model.NodeStatic
DeleteNodesStatic []string
UpsertSubscriptionNodes []model.SubscriptionNode
DeleteSubscriptionNodes []model.SubscriptionNodeKey
UpsertNodesDynamic []model.NodeDynamic
DeleteNodesDynamic []string
UpsertNodeLatency []model.NodeLatency
DeleteNodeLatency []model.NodeLatencyKey
UpsertLeases []model.Lease
DeleteLeases []model.LeaseKey
}
FlushOps holds all upsert/delete slices for a single-transaction cache flush.
type LeaseDirtyKey ¶
LeaseDirtyKey is the composite key for the leases dirty set.
type NodeLatencyDirtyKey ¶
type NodeLatencyDirtyKey = model.NodeLatencyKey
NodeLatencyDirtyKey is the composite key for the node_latency dirty set.
type StateEngine ¶
StateEngine is the single write entry point for all persistence operations. Strong-persist data (config, platforms, subscriptions, rules) goes through transactional writes to state.db. Weak-persist data (nodes, leases) is marked dirty and batch-flushed to cache.db.
func PersistenceBootstrap ¶
func PersistenceBootstrap(stateDir, cacheDir string) (engine *StateEngine, closer io.Closer, err error)
PersistenceBootstrap initializes both databases, runs consistency repair, and returns a ready-to-use StateEngine plus an io.Closer for the DB handles.
Steps:
- Open/create state.db and cache.db with recommended pragmas.
- Run schema migrations on both databases.
- Run consistency repair (cross-db orphan cleanup).
- Construct and return StateEngine.
func (*StateEngine) DirtyCount ¶
func (e *StateEngine) DirtyCount() int
DirtyCount returns the total number of dirty entries across all sets.
func (*StateEngine) FlushDirtySets ¶
func (e *StateEngine) FlushDirtySets(readers CacheReaders) error
FlushDirtySets drains all dirty sets, reads current values via readers, and batch-writes to cache.db in a single transaction. On failure, undrained entries are merged back.
func (*StateEngine) MarkLease ¶
func (e *StateEngine) MarkLease(platformID, account string)
func (*StateEngine) MarkLeaseDelete ¶
func (e *StateEngine) MarkLeaseDelete(platformID, account string)
func (*StateEngine) MarkNodeDynamic ¶
func (e *StateEngine) MarkNodeDynamic(hash string)
func (*StateEngine) MarkNodeDynamicDelete ¶
func (e *StateEngine) MarkNodeDynamicDelete(hash string)
func (*StateEngine) MarkNodeLatency ¶
func (e *StateEngine) MarkNodeLatency(nodeHash, domain string)
func (*StateEngine) MarkNodeLatencyDelete ¶
func (e *StateEngine) MarkNodeLatencyDelete(nodeHash, domain string)
func (*StateEngine) MarkNodeStatic ¶
func (e *StateEngine) MarkNodeStatic(hash string)
func (*StateEngine) MarkNodeStaticDelete ¶
func (e *StateEngine) MarkNodeStaticDelete(hash string)
func (*StateEngine) MarkSubscriptionNode ¶
func (e *StateEngine) MarkSubscriptionNode(subID, nodeHash string)
func (*StateEngine) MarkSubscriptionNodeDelete ¶
func (e *StateEngine) MarkSubscriptionNodeDelete(subID, nodeHash string)
type StateRepo ¶
type StateRepo struct {
// contains filtered or unexported fields
}
StateRepo wraps state.db and provides transactional CRUD for strong-persist data. All writes are serialized by an internal mutex.
func (*StateRepo) DeleteAccountHeaderRule ¶
DeleteAccountHeaderRule removes a rule by url_prefix.
func (*StateRepo) DeletePlatform ¶
DeletePlatform removes a platform by ID.
func (*StateRepo) DeleteSubscription ¶
DeleteSubscription removes a subscription by ID.
func (*StateRepo) EnsureAccountHeaderRule ¶
func (r *StateRepo) EnsureAccountHeaderRule(rule model.AccountHeaderRule) (bool, error)
EnsureAccountHeaderRule inserts a rule by url_prefix only when it does not already exist and reports whether the row was newly created.
func (*StateRepo) GetPlatform ¶
GetPlatform returns one platform by ID.
func (*StateRepo) GetPlatformName ¶
GetPlatformName returns platform name by ID without decoding filter columns.
func (*StateRepo) GetSystemConfig ¶
func (r *StateRepo) GetSystemConfig() (*config.RuntimeConfig, int, error)
GetSystemConfig loads the runtime config and version from state.db. Returns nil config and version 0 if no row exists.
func (*StateRepo) ListAccountHeaderRules ¶
func (r *StateRepo) ListAccountHeaderRules() ([]model.AccountHeaderRule, error)
ListAccountHeaderRules returns all rules.
func (*StateRepo) ListPlatforms ¶
ListPlatforms returns all platforms.
func (*StateRepo) ListSubscriptions ¶
func (r *StateRepo) ListSubscriptions() ([]model.Subscription, error)
ListSubscriptions returns all subscriptions.
func (*StateRepo) SaveSystemConfig ¶
func (r *StateRepo) SaveSystemConfig(cfg *config.RuntimeConfig, version int, updatedAtNs int64) error
SaveSystemConfig persists the runtime config with the given version.
func (*StateRepo) UpsertAccountHeaderRuleWithCreated ¶
func (r *StateRepo) UpsertAccountHeaderRuleWithCreated(rule model.AccountHeaderRule) (bool, error)
UpsertAccountHeaderRuleWithCreated inserts or updates a rule by url_prefix and reports whether the row was newly created.
func (*StateRepo) UpsertPlatform ¶
UpsertPlatform inserts or updates a platform by ID. If the name collides with a different platform's name, ErrConflict is returned.
func (*StateRepo) UpsertSubscription ¶
func (r *StateRepo) UpsertSubscription(s model.Subscription) error
UpsertSubscription inserts or updates a subscription by ID. On update, created_at_ns is preserved (not overwritten).
type SubscriptionNodeDirtyKey ¶
type SubscriptionNodeDirtyKey = model.SubscriptionNodeKey
SubscriptionNodeDirtyKey is the composite key for the subscription_nodes dirty set.