store

package
v0.0.0-...-2f11b0e Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Mar 17, 2026 License: MIT Imports: 18 Imported by: 0

Documentation

Index

Constants

This section is empty.

Variables

View Source
var (
	ErrNotFound = errors.New("key not found")
)

Functions

This section is empty.

Types

type BadgerIndexStore

type BadgerIndexStore struct {
	// contains filtered or unexported fields
}

BadgerIndexStore is a BadgerDB implementation of store.IndexStore.

func NewBadgerIndexStore

func NewBadgerIndexStore(db *badger.DB) *BadgerIndexStore

NewBadgerIndexStore creates a new BadgerDB-backed index store.

func (*BadgerIndexStore) Close

func (s *BadgerIndexStore) Close() error

Close closes the badger db connection if it's not already closed.

func (*BadgerIndexStore) Delete

func (s *BadgerIndexStore) Delete(key []byte) error

Delete removes an index entry.

func (*BadgerIndexStore) ReverseScan

func (s *BadgerIndexStore) ReverseScan(start, end []byte, fn func(key []byte) bool) error

ReverseScan iterates over keys in the range [start, end) in reverse lexicographic order.

func (*BadgerIndexStore) Scan

func (s *BadgerIndexStore) Scan(start, end []byte, fn func(key []byte) bool) error

Scan iterates over keys in the range [start, end) in lexicographic order.

func (*BadgerIndexStore) ScanPrefix

func (s *BadgerIndexStore) ScanPrefix(prefix []byte, fn func(key []byte) bool) error

ScanPrefix iterates over all keys with the given prefix.

func (*BadgerIndexStore) Set

func (s *BadgerIndexStore) Set(key []byte, value []byte) error

Set stores an index entry.

type BadgerKVStore

type BadgerKVStore struct {
	// contains filtered or unexported fields
}

BadgerKVStore is a BadgerDB implementation of store.KVStore.

func NewBadgerKVStore

func NewBadgerKVStore(db *badger.DB) *BadgerKVStore

NewBadgerKVStore creates a new BadgerDB-backed KV store.

func (*BadgerKVStore) Close

func (s *BadgerKVStore) Close() error

Close closes the badger db connection if it's not already closed.

func (*BadgerKVStore) Delete

func (s *BadgerKVStore) Delete(key string) error

Delete removes a key.

func (*BadgerKVStore) Get

func (s *BadgerKVStore) Get(key string) ([]byte, error)

Get retrieves the value for a key.

func (*BadgerKVStore) ScanPrefix

func (s *BadgerKVStore) ScanPrefix(prefix string, fn func(key string, value []byte) bool) error

ScanPrefix iterates over all keys with the given prefix.

func (*BadgerKVStore) Set

func (s *BadgerKVStore) Set(key string, value []byte) error

Set stores a key-value pair.

type EntityStore

type EntityStore struct {
	// contains filtered or unexported fields
}

EntityStore is the main entry point for entity storage and retrieval.

func NewEmbeddedStore

func NewEmbeddedStore(cfg *config.Config) (*EntityStore, error)

func NewEntityStore

func NewEntityStore(kv KVStore, indexStore IndexStore, registry *index.Registry, serializer Serializer) (*EntityStore, error)

NewEntityStore creates a new EntityStore.

func NewMemoryStore

func NewMemoryStore(cfg *config.Config) (*EntityStore, error)

func (*EntityStore) Close

func (s *EntityStore) Close(ctx context.Context) error

Close releases resources held by the store.

func (*EntityStore) DeleteEntity

func (s *EntityStore) DeleteEntity(entityType, entityID string) error

DeleteEntity removes ALL versions of an entity. TODO: improve error handling in this function

func (*EntityStore) DeleteExpiredBatch

func (s *EntityStore) DeleteExpiredBatch(entityType string, cutoffNS int64, limit int) (int, error)

DeleteExpiredBatch deletes up to limit expired entity versions (timestamps before cutoffNS). Returns the number of versions deleted.

func (*EntityStore) DeleteVersion

func (s *EntityStore) DeleteVersion(e *entity.Entity) error

DeleteVersion removes a specific version of an entity and all its index entries. If deleting the latest version, updates _latest indexes to the new latest. TODO: improve error handling in this function

func (*EntityStore) Get

func (s *EntityStore) Get(entityType, entityID string) (*entity.Entity, error)

Get retrieves the latest version of an entity by type and ID. Uses reverse scan of _by_id index to find the most recent timestamp.

func (*EntityStore) GetAt

func (s *EntityStore) GetAt(entityType, entityID string, asOfTime int64) (*entity.Entity, error)

GetAt retrieves the latest version of an entity as of a specific point in time. Returns the most recent version where timestamp <= asOfTime.

func (*EntityStore) GetHistory

func (s *EntityStore) GetHistory(entityType, entityID string, opts *HistoryOptions) ([]*entity.Entity, error)

GetHistory retrieves all versions of an entity.

func (*EntityStore) GetVersion

func (s *EntityStore) GetVersion(entityType, entityID string, timestamp int64) (*entity.Entity, error)

GetVersion retrieves a specific version of an entity by type, ID, and timestamp.

func (*EntityStore) Query

func (s *EntityStore) Query(q *Query) ([]*entity.Entity, error)

Query executes a query and returns matching entities. TODO: figure out multi-tenancy within the database

func (*EntityStore) Reindex

func (s *EntityStore) Reindex(entityType string) error

Reindex rebuilds all indexes for a given entity type. This clears existing indexes and re-indexes all entities from the KV store.

func (*EntityStore) Write

func (s *EntityStore) Write(e *entity.Entity) error

Write stores an entity version and updates all configured indexes.

type FieldFilter

type FieldFilter struct {
	Field string
	Op    Op
	Value entity.Value
}

FieldFilter specifies a filter on a single field.

type HistoryOptions

type HistoryOptions struct {
	TimeRange *TimeRange
	Limit     int
	Reverse   bool // true = newest first (descending timestamp)
}

HistoryOptions configures a GetHistory query.

type IndexStore

type IndexStore interface {
	// Set stores an index entry (value is typically empty).
	Set(key []byte, value []byte) error

	// Delete removes an index entry.
	Delete(key []byte) error

	// Scan iterates over keys in the range [start, end) in lexicographic order.
	// The callback receives each key; return false to stop iteration.
	Scan(start, end []byte, fn func(key []byte) bool) error

	// ScanPrefix iterates over all keys with the given prefix.
	// The callback receives each key; return false to stop iteration.
	ScanPrefix(prefix []byte, fn func(key []byte) bool) error

	// ReverseScan iterates over keys in the range [start, end) in reverse lexicographic order.
	// The callback receives each key; return false to stop iteration.
	ReverseScan(start, end []byte, fn func(key []byte) bool) error

	// Close releases any resources held by the store.
	Close() error
}

IndexStore is the interface for index storage with ordered key scans. Keys follow the schema: {entity_type}/{field_name}/{field_value_bytes}/{timestamp_unix_ns}/{entity_id}

type JSONSerializer

type JSONSerializer struct{}

JSONSerializer implements Serializer using JSON encoding.

func NewJSONSerializer

func NewJSONSerializer() *JSONSerializer

NewJSONSerializer creates a new JSON serializer.

func (*JSONSerializer) Marshal

func (s *JSONSerializer) Marshal(e *entity.Entity) ([]byte, error)

Marshal encodes an entity to JSON.

func (*JSONSerializer) Unmarshal

func (s *JSONSerializer) Unmarshal(data []byte, e *entity.Entity) error

Unmarshal decodes JSON into an entity.

type KVStore

type KVStore interface {
	// Get retrieves the value for a key. Returns ErrNotFound if key doesn't exist.
	Get(key string) ([]byte, error)

	// Set stores a key-value pair.
	Set(key string, value []byte) error

	// Delete removes a key.
	Delete(key string) error

	// ScanPrefix iterates over all keys with the given prefix.
	// The callback receives each key and value; return false to stop iteration.
	ScanPrefix(prefix string, fn func(key string, value []byte) bool) error

	// Close releases any resources held by the store.
	Close() error
}

KVStore is the interface for key-value storage of full entities. Keys follow the schema: {entity_type}:{entity_id}

type MemoryIndexStore

type MemoryIndexStore struct {
	// contains filtered or unexported fields
}

MemoryIndexStore is an in-memory implementation of store.IndexStore using a sorted slice.

func NewMemoryIndexStore

func NewMemoryIndexStore() *MemoryIndexStore

NewMemoryIndexStore creates a new in-memory index store.

func (*MemoryIndexStore) Close

func (s *MemoryIndexStore) Close() error

Close releases resources.

func (*MemoryIndexStore) Delete

func (s *MemoryIndexStore) Delete(key []byte) error

Delete removes an index entry.

func (*MemoryIndexStore) ReverseScan

func (s *MemoryIndexStore) ReverseScan(start, end []byte, fn func(key []byte) bool) error

ReverseScan iterates over keys in the range [start, end) in reverse lexicographic order.

func (*MemoryIndexStore) Scan

func (s *MemoryIndexStore) Scan(start, end []byte, fn func(key []byte) bool) error

Scan iterates over keys in the range [start, end) in lexicographic order.

func (*MemoryIndexStore) ScanPrefix

func (s *MemoryIndexStore) ScanPrefix(prefix []byte, fn func(key []byte) bool) error

ScanPrefix iterates over all keys with the given prefix.

func (*MemoryIndexStore) Set

func (s *MemoryIndexStore) Set(key []byte, value []byte) error

Set stores an index entry.

type MemoryKVStore

type MemoryKVStore struct {
	// contains filtered or unexported fields
}

MemoryKVStore is an in-memory implementation of store.KVStore.

func NewMemoryKVStore

func NewMemoryKVStore() *MemoryKVStore

NewMemoryKVStore creates a new in-memory KV store.

func (*MemoryKVStore) Close

func (s *MemoryKVStore) Close() error

Close releases resources.

func (*MemoryKVStore) Delete

func (s *MemoryKVStore) Delete(key string) error

Delete removes a key.

func (*MemoryKVStore) Get

func (s *MemoryKVStore) Get(key string) ([]byte, error)

Get retrieves the value for a key.

func (*MemoryKVStore) ScanPrefix

func (s *MemoryKVStore) ScanPrefix(prefix string, fn func(key string, value []byte) bool) error

ScanPrefix iterates over all keys with the given prefix in sorted order.

func (*MemoryKVStore) Set

func (s *MemoryKVStore) Set(key string, value []byte) error

Set stores a key-value pair.

type MsgpackSerializer

type MsgpackSerializer struct{}

func NewMsgpackSerializer

func NewMsgpackSerializer() *MsgpackSerializer

func (*MsgpackSerializer) Marshal

func (s *MsgpackSerializer) Marshal(e *entity.Entity) ([]byte, error)

func (*MsgpackSerializer) Unmarshal

func (s *MsgpackSerializer) Unmarshal(data []byte, e *entity.Entity) error

type Op

type Op uint8

Op represents a filter operation.

const (
	OpEq       Op = iota // Equal
	OpLt                 // Less than
	OpLte                // Less than or equal
	OpGt                 // Greater than
	OpGte                // Greater than or equal
	OpContains           // Array contains element
)

type Query

type Query struct {
	EntityType  string
	Filters     []FieldFilter
	TimeRange   *TimeRange
	Limit       int
	Reverse     bool // If true, return results in reverse chronological order (newest first)
	AllVersions bool // If false (default), only return entities whose latest version matches filters; if true, return all matching historical versions
	MatchAny    bool // If true, use OR semantics (match any filter); if false (default), use AND semantics (match all filters)
}

Query defines a query against the entity store.

type RetentionConfig

type RetentionConfig struct {
	Interval   time.Duration // How often to run cleanup (default: 1h)
	BatchSize  int           // Max entities to delete per batch (default: 1000)
	BatchDelay time.Duration // Delay between batches to avoid blocking writes (default: 100ms)
}

RetentionConfig holds configuration for the retention worker.

func DefaultRetentionConfig

func DefaultRetentionConfig() RetentionConfig

DefaultRetentionConfig returns the default retention configuration.

type RetentionWorker

type RetentionWorker struct {
	// contains filtered or unexported fields
}

RetentionWorker runs periodic cleanup of expired entities.

func NewRetentionWorker

func NewRetentionWorker(store *EntityStore, registry *index.Registry, config RetentionConfig) *RetentionWorker

NewRetentionWorker creates a new retention worker.

func (*RetentionWorker) Start

func (w *RetentionWorker) Start()

Start begins the background cleanup loop.

func (*RetentionWorker) Stop

func (w *RetentionWorker) Stop(ctx context.Context) error

Stop gracefully stops the retention worker. It waits for the current cleanup cycle to complete or until ctx is cancelled.

type Serializer

type Serializer interface {
	Marshal(e *entity.Entity) ([]byte, error)
	Unmarshal(data []byte, e *entity.Entity) error
}

Serializer handles encoding and decoding of entities.

type TimeRange

type TimeRange struct {
	From int64 // Unix nanoseconds (inclusive)
	To   int64 // Unix nanoseconds (inclusive)
}

TimeRange specifies a time range for queries.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL