state

package
v1.1.1 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: May 16, 2026 License: MIT Imports: 21 Imported by: 0

Documentation

Overview

Package state implements the persistence layer: SQLite repos, StateEngine, dirty-set flush, consistency repair, and bootstrap.

Index

Constants

This section is empty.

Variables

View Source
var ErrConflict = errors.New("conflict")

ErrConflict is returned when a write violates a uniqueness/conflict constraint.

View Source
var ErrNotFound = errors.New("not found")

ErrNotFound is returned when a requested resource does not exist in the database.

Functions

func InitDB

func InitDB(db *sql.DB, ddl string) error

InitDB executes raw DDL statements on the given database. Used by non-state SQLite stores (for example metrics and request logs).

func MigrateCacheDB

func MigrateCacheDB(db *sql.DB) error

MigrateCacheDB applies cache.db migrations.

func MigrateStateDB

func MigrateStateDB(db *sql.DB) error

MigrateStateDB applies state.db migrations.

func OpenDB

func OpenDB(path string) (*sql.DB, error)

OpenDB opens (or creates) a SQLite database at path with recommended pragmas: WAL journal mode, synchronous=NORMAL, foreign_keys=ON, busy_timeout=5000.

func RepairConsistency

func RepairConsistency(stateDBPath string, cacheDB *sql.DB) error

RepairConsistency runs orphan-cleanup SQL on cache.db, cross-referencing state.db via ATTACH. All DELETEs execute in a single transaction to avoid half-repaired state on crash.

Cleanup order (by dependency):

  1. subscription_nodes: remove entries whose subscription_id is missing from state.subscriptions OR (for non-evicted rows) whose node_hash is missing from nodes_static.
  2. nodes_static: remove entries with no remaining non-evicted reference in subscription_nodes.
  3. nodes_dynamic: remove entries whose hash is missing from nodes_static.
  4. node_latency: remove entries whose node_hash is missing from nodes_static.
  5. leases: remove entries whose platform_id is missing from state.platforms OR whose node_hash is missing from nodes_static.

Types

type CacheFlushWorker

type CacheFlushWorker struct {
	// contains filtered or unexported fields
}

CacheFlushWorker periodically flushes dirty sets to cache.db. It triggers a flush when:

  • DirtyCount() >= threshold, OR
  • time.Since(lastFlush) >= interval (and dirty count > 0)

On Stop(), a final flush is performed before returning.

func NewCacheFlushWorker

func NewCacheFlushWorker(
	engine *StateEngine,
	readers CacheReaders,
	thresholdFn func() int,
	intervalFn func() time.Duration,
	checkTick time.Duration,
) *CacheFlushWorker

NewCacheFlushWorker creates a flush worker that pulls threshold/interval from callbacks on each check cycle. checkTick controls how often flush conditions are evaluated (e.g. 5s).

func (*CacheFlushWorker) Start

func (w *CacheFlushWorker) Start()

Start launches the background flush goroutine.

func (*CacheFlushWorker) Stop

func (w *CacheFlushWorker) Stop()

Stop signals the worker to stop and performs a final flush. Blocks until the goroutine exits.

type CacheReaders

type CacheReaders struct {
	ReadNodeStatic       func(hash string) *model.NodeStatic
	ReadNodeDynamic      func(hash string) *model.NodeDynamic
	ReadNodeLatency      func(key NodeLatencyDirtyKey) *model.NodeLatency
	ReadLease            func(key LeaseDirtyKey) *model.Lease
	ReadSubscriptionNode func(key SubscriptionNodeDirtyKey) *model.SubscriptionNode
}

CacheReaders provides callbacks for reading current in-memory values at flush time. If a reader returns nil for a key marked OpUpsert, the key is treated as a delete (the object was removed between mark and flush).

type CacheRepo

type CacheRepo struct {
	// contains filtered or unexported fields
}

CacheRepo wraps cache.db and provides batch read/write for weak-persist data.

func (*CacheRepo) BulkDeleteLeases

func (r *CacheRepo) BulkDeleteLeases(keys []model.LeaseKey) error

BulkDeleteLeases batch-deletes lease records by composite key.

func (*CacheRepo) BulkDeleteNodeLatency

func (r *CacheRepo) BulkDeleteNodeLatency(keys []model.NodeLatencyKey) error

BulkDeleteNodeLatency batch-deletes node latency records by composite key.

func (*CacheRepo) BulkDeleteNodesDynamic

func (r *CacheRepo) BulkDeleteNodesDynamic(hashes []string) error

BulkDeleteNodesDynamic batch-deletes node dynamic records by hash.

func (*CacheRepo) BulkDeleteNodesStatic

func (r *CacheRepo) BulkDeleteNodesStatic(hashes []string) error

BulkDeleteNodesStatic batch-deletes node static records by hash.

func (*CacheRepo) BulkDeleteSubscriptionNodes

func (r *CacheRepo) BulkDeleteSubscriptionNodes(keys []model.SubscriptionNodeKey) error

BulkDeleteSubscriptionNodes batch-deletes subscription-node links by composite key.

func (*CacheRepo) BulkUpsertLeases

func (r *CacheRepo) BulkUpsertLeases(leases []model.Lease) error

BulkUpsertLeases batch-inserts or updates lease records.

func (*CacheRepo) BulkUpsertNodeLatency

func (r *CacheRepo) BulkUpsertNodeLatency(entries []model.NodeLatency) error

BulkUpsertNodeLatency batch-inserts or updates node latency records.

func (*CacheRepo) BulkUpsertNodesDynamic

func (r *CacheRepo) BulkUpsertNodesDynamic(nodes []model.NodeDynamic) error

BulkUpsertNodesDynamic batch-inserts or updates node dynamic records.

func (*CacheRepo) BulkUpsertNodesStatic

func (r *CacheRepo) BulkUpsertNodesStatic(nodes []model.NodeStatic) error

BulkUpsertNodesStatic batch-inserts or updates node static records.

func (*CacheRepo) BulkUpsertSubscriptionNodes

func (r *CacheRepo) BulkUpsertSubscriptionNodes(nodes []model.SubscriptionNode) error

BulkUpsertSubscriptionNodes batch-inserts or updates subscription-node links.

func (*CacheRepo) FlushTx

func (r *CacheRepo) FlushTx(ops FlushOps) error

FlushTx executes all upserts and deletes in a single transaction.

Upsert order: nodes_static → subscription_nodes → nodes_dynamic → node_latency → leases Delete order: leases → node_latency → nodes_dynamic → subscription_nodes → nodes_static

func (*CacheRepo) LoadAllLeases

func (r *CacheRepo) LoadAllLeases() ([]model.Lease, error)

LoadAllLeases reads all lease records.

func (*CacheRepo) LoadAllNodeLatency

func (r *CacheRepo) LoadAllNodeLatency() ([]model.NodeLatency, error)

LoadAllNodeLatency reads all node latency records.

func (*CacheRepo) LoadAllNodesDynamic

func (r *CacheRepo) LoadAllNodesDynamic() ([]model.NodeDynamic, error)

LoadAllNodesDynamic reads all node dynamic records.

func (*CacheRepo) LoadAllNodesStatic

func (r *CacheRepo) LoadAllNodesStatic() ([]model.NodeStatic, error)

LoadAllNodesStatic reads all node static records.

func (*CacheRepo) LoadAllSubscriptionNodes

func (r *CacheRepo) LoadAllSubscriptionNodes() ([]model.SubscriptionNode, error)

LoadAllSubscriptionNodes reads all subscription-node links.

type DirtyOp

type DirtyOp int

DirtyOp represents the type of dirty operation.

const (
	// OpUpsert marks a key for upsert (value read from memory at flush time).
	OpUpsert DirtyOp = iota
	// OpDelete marks a key for deletion.
	OpDelete
)

type DirtySet

type DirtySet[K comparable] struct {
	// contains filtered or unexported fields
}

DirtySet tracks dirty keys with their operation type. It stores only keys — values are read from memory at flush time. Thread-safe via mutex; drain uses map-swap for a stable snapshot.

func NewDirtySet

func NewDirtySet[K comparable]() *DirtySet[K]

NewDirtySet creates an empty DirtySet.

func (*DirtySet[K]) Drain

func (d *DirtySet[K]) Drain() map[K]DirtyOp

Drain atomically swaps the internal map with a fresh one and returns the old map as a stable snapshot. Concurrent marks after Drain go into the new map.

func (*DirtySet[K]) Len

func (d *DirtySet[K]) Len() int

Len returns the current number of dirty entries.

func (*DirtySet[K]) MarkDelete

func (d *DirtySet[K]) MarkDelete(key K)

MarkDelete marks a key for deletion.

func (*DirtySet[K]) MarkUpsert

func (d *DirtySet[K]) MarkUpsert(key K)

MarkUpsert marks a key for upsert.

func (*DirtySet[K]) Merge

func (d *DirtySet[K]) Merge(old map[K]DirtyOp)

Merge re-merges a previously drained snapshot back into the dirty set. Used for flush-failure recovery. Only keys that have NOT been re-dirtied since the drain are restored, preserving newer marks.

type FlushOps

type FlushOps struct {
	UpsertNodesStatic       []model.NodeStatic
	DeleteNodesStatic       []string
	UpsertSubscriptionNodes []model.SubscriptionNode
	DeleteSubscriptionNodes []model.SubscriptionNodeKey
	UpsertNodesDynamic      []model.NodeDynamic
	DeleteNodesDynamic      []string
	UpsertNodeLatency       []model.NodeLatency
	DeleteNodeLatency       []model.NodeLatencyKey
	UpsertLeases            []model.Lease
	DeleteLeases            []model.LeaseKey
}

FlushOps holds all upsert/delete slices for a single-transaction cache flush.

type LeaseDirtyKey

type LeaseDirtyKey = model.LeaseKey

LeaseDirtyKey is the composite key for the leases dirty set.

type NodeLatencyDirtyKey

type NodeLatencyDirtyKey = model.NodeLatencyKey

NodeLatencyDirtyKey is the composite key for the node_latency dirty set.

type StateEngine

type StateEngine struct {
	*StateRepo
	*CacheRepo
	// contains filtered or unexported fields
}

StateEngine is the single write entry point for all persistence operations. Strong-persist data (config, platforms, subscriptions, rules) goes through transactional writes to state.db. Weak-persist data (nodes, leases) is marked dirty and batch-flushed to cache.db.

func PersistenceBootstrap

func PersistenceBootstrap(stateDir, cacheDir string) (engine *StateEngine, closer io.Closer, err error)

PersistenceBootstrap initializes both databases, runs consistency repair, and returns a ready-to-use StateEngine plus an io.Closer for the DB handles.

Steps:

  1. Open/create state.db and cache.db with recommended pragmas.
  2. Run schema migrations on both databases.
  3. Run consistency repair (cross-db orphan cleanup).
  4. Construct and return StateEngine.

func (*StateEngine) DirtyCount

func (e *StateEngine) DirtyCount() int

DirtyCount returns the total number of dirty entries across all sets.

func (*StateEngine) FlushDirtySets

func (e *StateEngine) FlushDirtySets(readers CacheReaders) error

FlushDirtySets drains all dirty sets, reads current values via readers, and batch-writes to cache.db in a single transaction. On failure, undrained entries are merged back.

func (*StateEngine) MarkLease

func (e *StateEngine) MarkLease(platformID, account string)

func (*StateEngine) MarkLeaseDelete

func (e *StateEngine) MarkLeaseDelete(platformID, account string)

func (*StateEngine) MarkNodeDynamic

func (e *StateEngine) MarkNodeDynamic(hash string)

func (*StateEngine) MarkNodeDynamicDelete

func (e *StateEngine) MarkNodeDynamicDelete(hash string)

func (*StateEngine) MarkNodeLatency

func (e *StateEngine) MarkNodeLatency(nodeHash, domain string)

func (*StateEngine) MarkNodeLatencyDelete

func (e *StateEngine) MarkNodeLatencyDelete(nodeHash, domain string)

func (*StateEngine) MarkNodeStatic

func (e *StateEngine) MarkNodeStatic(hash string)

func (*StateEngine) MarkNodeStaticDelete

func (e *StateEngine) MarkNodeStaticDelete(hash string)

func (*StateEngine) MarkSubscriptionNode

func (e *StateEngine) MarkSubscriptionNode(subID, nodeHash string)

func (*StateEngine) MarkSubscriptionNodeDelete

func (e *StateEngine) MarkSubscriptionNodeDelete(subID, nodeHash string)

type StateRepo

type StateRepo struct {
	// contains filtered or unexported fields
}

StateRepo wraps state.db and provides transactional CRUD for strong-persist data. All writes are serialized by an internal mutex.

func (*StateRepo) DeleteAccountHeaderRule

func (r *StateRepo) DeleteAccountHeaderRule(prefix string) error

DeleteAccountHeaderRule removes a rule by url_prefix.

func (*StateRepo) DeletePlatform

func (r *StateRepo) DeletePlatform(id string) error

DeletePlatform removes a platform by ID.

func (*StateRepo) DeleteSubscription

func (r *StateRepo) DeleteSubscription(id string) error

DeleteSubscription removes a subscription by ID.

func (*StateRepo) EnsureAccountHeaderRule

func (r *StateRepo) EnsureAccountHeaderRule(rule model.AccountHeaderRule) (bool, error)

EnsureAccountHeaderRule inserts a rule by url_prefix only when it does not already exist and reports whether the row was newly created.

func (*StateRepo) GetPlatform

func (r *StateRepo) GetPlatform(id string) (*model.Platform, error)

GetPlatform returns one platform by ID.

func (*StateRepo) GetPlatformName

func (r *StateRepo) GetPlatformName(id string) (string, error)

GetPlatformName returns platform name by ID without decoding filter columns.

func (*StateRepo) GetSystemConfig

func (r *StateRepo) GetSystemConfig() (*config.RuntimeConfig, int, error)

GetSystemConfig loads the runtime config and version from state.db. Returns nil config and version 0 if no row exists.

func (*StateRepo) ListAccountHeaderRules

func (r *StateRepo) ListAccountHeaderRules() ([]model.AccountHeaderRule, error)

ListAccountHeaderRules returns all rules.

func (*StateRepo) ListPlatforms

func (r *StateRepo) ListPlatforms() ([]model.Platform, error)

ListPlatforms returns all platforms.

func (*StateRepo) ListSubscriptions

func (r *StateRepo) ListSubscriptions() ([]model.Subscription, error)

ListSubscriptions returns all subscriptions.

func (*StateRepo) SaveSystemConfig

func (r *StateRepo) SaveSystemConfig(cfg *config.RuntimeConfig, version int, updatedAtNs int64) error

SaveSystemConfig persists the runtime config with the given version.

func (*StateRepo) UpsertAccountHeaderRuleWithCreated

func (r *StateRepo) UpsertAccountHeaderRuleWithCreated(rule model.AccountHeaderRule) (bool, error)

UpsertAccountHeaderRuleWithCreated inserts or updates a rule by url_prefix and reports whether the row was newly created.

func (*StateRepo) UpsertPlatform

func (r *StateRepo) UpsertPlatform(p model.Platform) error

UpsertPlatform inserts or updates a platform by ID. If the name collides with a different platform's name, ErrConflict is returned.

func (*StateRepo) UpsertSubscription

func (r *StateRepo) UpsertSubscription(s model.Subscription) error

UpsertSubscription inserts or updates a subscription by ID. On update, created_at_ns is preserved (not overwritten).

type SubscriptionNodeDirtyKey

type SubscriptionNodeDirtyKey = model.SubscriptionNodeKey

SubscriptionNodeDirtyKey is the composite key for the subscription_nodes dirty set.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL