Documentation
¶
Index ¶
- Variables
- type BadgerIndexStore
- func (s *BadgerIndexStore) Close() error
- func (s *BadgerIndexStore) Delete(key []byte) error
- func (s *BadgerIndexStore) ReverseScan(start, end []byte, fn func(key []byte) bool) error
- func (s *BadgerIndexStore) Scan(start, end []byte, fn func(key []byte) bool) error
- func (s *BadgerIndexStore) ScanPrefix(prefix []byte, fn func(key []byte) bool) error
- func (s *BadgerIndexStore) Set(key []byte, value []byte) error
- type BadgerKVStore
- type EntityStore
- func (s *EntityStore) Close(ctx context.Context) error
- func (s *EntityStore) DeleteEntity(entityType, entityID string) error
- func (s *EntityStore) DeleteExpiredBatch(entityType string, cutoffNS int64, limit int) (int, error)
- func (s *EntityStore) DeleteVersion(e *entity.Entity) error
- func (s *EntityStore) Get(entityType, entityID string) (*entity.Entity, error)
- func (s *EntityStore) GetAt(entityType, entityID string, asOfTime int64) (*entity.Entity, error)
- func (s *EntityStore) GetHistory(entityType, entityID string, opts *HistoryOptions) ([]*entity.Entity, error)
- func (s *EntityStore) GetVersion(entityType, entityID string, timestamp int64) (*entity.Entity, error)
- func (s *EntityStore) Query(q *Query) ([]*entity.Entity, error)
- func (s *EntityStore) Reindex(entityType string) error
- func (s *EntityStore) Write(e *entity.Entity) error
- type FieldFilter
- type HistoryOptions
- type IndexStore
- type JSONSerializer
- type KVStore
- type MemoryIndexStore
- func (s *MemoryIndexStore) Close() error
- func (s *MemoryIndexStore) Delete(key []byte) error
- func (s *MemoryIndexStore) ReverseScan(start, end []byte, fn func(key []byte) bool) error
- func (s *MemoryIndexStore) Scan(start, end []byte, fn func(key []byte) bool) error
- func (s *MemoryIndexStore) ScanPrefix(prefix []byte, fn func(key []byte) bool) error
- func (s *MemoryIndexStore) Set(key []byte, value []byte) error
- type MemoryKVStore
- type MsgpackSerializer
- type Op
- type Query
- type RetentionConfig
- type RetentionWorker
- type Serializer
- type TimeRange
Constants ¶
This section is empty.
Variables ¶
var (
ErrNotFound = errors.New("key not found")
)
Functions ¶
This section is empty.
Types ¶
type BadgerIndexStore ¶
type BadgerIndexStore struct {
// contains filtered or unexported fields
}
BadgerIndexStore is a BadgerDB implementation of store.IndexStore.
func NewBadgerIndexStore ¶
func NewBadgerIndexStore(db *badger.DB) *BadgerIndexStore
NewBadgerIndexStore creates a new BadgerDB-backed index store.
func (*BadgerIndexStore) Close ¶
func (s *BadgerIndexStore) Close() error
Close closes the badger db connection if it's not already closed.
func (*BadgerIndexStore) Delete ¶
func (s *BadgerIndexStore) Delete(key []byte) error
Delete removes an index entry.
func (*BadgerIndexStore) ReverseScan ¶
func (s *BadgerIndexStore) ReverseScan(start, end []byte, fn func(key []byte) bool) error
ReverseScan iterates over keys in the range [start, end) in reverse lexicographic order.
func (*BadgerIndexStore) Scan ¶
func (s *BadgerIndexStore) Scan(start, end []byte, fn func(key []byte) bool) error
Scan iterates over keys in the range [start, end) in lexicographic order.
func (*BadgerIndexStore) ScanPrefix ¶
func (s *BadgerIndexStore) ScanPrefix(prefix []byte, fn func(key []byte) bool) error
ScanPrefix iterates over all keys with the given prefix.
type BadgerKVStore ¶
type BadgerKVStore struct {
// contains filtered or unexported fields
}
BadgerKVStore is a BadgerDB implementation of store.KVStore.
func NewBadgerKVStore ¶
func NewBadgerKVStore(db *badger.DB) *BadgerKVStore
NewBadgerKVStore creates a new BadgerDB-backed KV store.
func (*BadgerKVStore) Close ¶
func (s *BadgerKVStore) Close() error
Close closes the badger db connection if it's not already closed.
func (*BadgerKVStore) Delete ¶
func (s *BadgerKVStore) Delete(key string) error
Delete removes a key.
func (*BadgerKVStore) Get ¶
func (s *BadgerKVStore) Get(key string) ([]byte, error)
Get retrieves the value for a key.
func (*BadgerKVStore) ScanPrefix ¶
ScanPrefix iterates over all keys with the given prefix.
type EntityStore ¶
type EntityStore struct {
// contains filtered or unexported fields
}
EntityStore is the main entry point for entity storage and retrieval.
func NewEmbeddedStore ¶
func NewEmbeddedStore(cfg *config.Config) (*EntityStore, error)
func NewEntityStore ¶
func NewEntityStore(kv KVStore, indexStore IndexStore, registry *index.Registry, serializer Serializer) (*EntityStore, error)
NewEntityStore creates a new EntityStore.
func NewMemoryStore ¶
func NewMemoryStore(cfg *config.Config) (*EntityStore, error)
func (*EntityStore) Close ¶
func (s *EntityStore) Close(ctx context.Context) error
Close releases resources held by the store.
func (*EntityStore) DeleteEntity ¶
func (s *EntityStore) DeleteEntity(entityType, entityID string) error
DeleteEntity removes ALL versions of an entity. TODO: improve error handling in this function
func (*EntityStore) DeleteExpiredBatch ¶
DeleteExpiredBatch deletes up to limit expired entity versions (timestamps before cutoffNS). Returns the number of versions deleted.
func (*EntityStore) DeleteVersion ¶
func (s *EntityStore) DeleteVersion(e *entity.Entity) error
DeleteVersion removes a specific version of an entity and all its index entries. If deleting the latest version, updates _latest indexes to the new latest. TODO: improve error handling in this function
func (*EntityStore) Get ¶
func (s *EntityStore) Get(entityType, entityID string) (*entity.Entity, error)
Get retrieves the latest version of an entity by type and ID. Uses reverse scan of _by_id index to find the most recent timestamp.
func (*EntityStore) GetAt ¶
GetAt retrieves the latest version of an entity as of a specific point in time. Returns the most recent version where timestamp <= asOfTime.
func (*EntityStore) GetHistory ¶
func (s *EntityStore) GetHistory(entityType, entityID string, opts *HistoryOptions) ([]*entity.Entity, error)
GetHistory retrieves all versions of an entity.
func (*EntityStore) GetVersion ¶
func (s *EntityStore) GetVersion(entityType, entityID string, timestamp int64) (*entity.Entity, error)
GetVersion retrieves a specific version of an entity by type, ID, and timestamp.
func (*EntityStore) Query ¶
func (s *EntityStore) Query(q *Query) ([]*entity.Entity, error)
Query executes a query and returns matching entities. TODO: figure out multi-tenancy within the database
func (*EntityStore) Reindex ¶
func (s *EntityStore) Reindex(entityType string) error
Reindex rebuilds all indexes for a given entity type. This clears existing indexes and re-indexes all entities from the KV store.
type FieldFilter ¶
FieldFilter specifies a filter on a single field.
type HistoryOptions ¶
type HistoryOptions struct {
TimeRange *TimeRange
Limit int
Reverse bool // true = newest first (descending timestamp)
}
HistoryOptions configures a GetHistory query.
type IndexStore ¶
type IndexStore interface {
// Set stores an index entry (value is typically empty).
Set(key []byte, value []byte) error
// Delete removes an index entry.
Delete(key []byte) error
// Scan iterates over keys in the range [start, end) in lexicographic order.
// The callback receives each key; return false to stop iteration.
Scan(start, end []byte, fn func(key []byte) bool) error
// ScanPrefix iterates over all keys with the given prefix.
// The callback receives each key; return false to stop iteration.
ScanPrefix(prefix []byte, fn func(key []byte) bool) error
// ReverseScan iterates over keys in the range [start, end) in reverse lexicographic order.
// The callback receives each key; return false to stop iteration.
ReverseScan(start, end []byte, fn func(key []byte) bool) error
// Close releases any resources held by the store.
Close() error
}
IndexStore is the interface for index storage with ordered key scans. Keys follow the schema: {entity_type}/{field_name}/{field_value_bytes}/{timestamp_unix_ns}/{entity_id}
type JSONSerializer ¶
type JSONSerializer struct{}
JSONSerializer implements Serializer using JSON encoding.
func NewJSONSerializer ¶
func NewJSONSerializer() *JSONSerializer
NewJSONSerializer creates a new JSON serializer.
type KVStore ¶
type KVStore interface {
// Get retrieves the value for a key. Returns ErrNotFound if key doesn't exist.
Get(key string) ([]byte, error)
// Set stores a key-value pair.
Set(key string, value []byte) error
// Delete removes a key.
Delete(key string) error
// ScanPrefix iterates over all keys with the given prefix.
// The callback receives each key and value; return false to stop iteration.
ScanPrefix(prefix string, fn func(key string, value []byte) bool) error
// Close releases any resources held by the store.
Close() error
}
KVStore is the interface for key-value storage of full entities. Keys follow the schema: {entity_type}:{entity_id}
type MemoryIndexStore ¶
type MemoryIndexStore struct {
// contains filtered or unexported fields
}
MemoryIndexStore is an in-memory implementation of store.IndexStore using a sorted slice.
func NewMemoryIndexStore ¶
func NewMemoryIndexStore() *MemoryIndexStore
NewMemoryIndexStore creates a new in-memory index store.
func (*MemoryIndexStore) Delete ¶
func (s *MemoryIndexStore) Delete(key []byte) error
Delete removes an index entry.
func (*MemoryIndexStore) ReverseScan ¶
func (s *MemoryIndexStore) ReverseScan(start, end []byte, fn func(key []byte) bool) error
ReverseScan iterates over keys in the range [start, end) in reverse lexicographic order.
func (*MemoryIndexStore) Scan ¶
func (s *MemoryIndexStore) Scan(start, end []byte, fn func(key []byte) bool) error
Scan iterates over keys in the range [start, end) in lexicographic order.
func (*MemoryIndexStore) ScanPrefix ¶
func (s *MemoryIndexStore) ScanPrefix(prefix []byte, fn func(key []byte) bool) error
ScanPrefix iterates over all keys with the given prefix.
type MemoryKVStore ¶
type MemoryKVStore struct {
// contains filtered or unexported fields
}
MemoryKVStore is an in-memory implementation of store.KVStore.
func NewMemoryKVStore ¶
func NewMemoryKVStore() *MemoryKVStore
NewMemoryKVStore creates a new in-memory KV store.
func (*MemoryKVStore) Delete ¶
func (s *MemoryKVStore) Delete(key string) error
Delete removes a key.
func (*MemoryKVStore) Get ¶
func (s *MemoryKVStore) Get(key string) ([]byte, error)
Get retrieves the value for a key.
func (*MemoryKVStore) ScanPrefix ¶
ScanPrefix iterates over all keys with the given prefix in sorted order.
type MsgpackSerializer ¶
type MsgpackSerializer struct{}
func NewMsgpackSerializer ¶
func NewMsgpackSerializer() *MsgpackSerializer
type Query ¶
type Query struct {
EntityType string
Filters []FieldFilter
TimeRange *TimeRange
Limit int
Reverse bool // If true, return results in reverse chronological order (newest first)
AllVersions bool // If false (default), only return entities whose latest version matches filters; if true, return all matching historical versions
MatchAny bool // If true, use OR semantics (match any filter); if false (default), use AND semantics (match all filters)
}
Query defines a query against the entity store.
type RetentionConfig ¶
type RetentionConfig struct {
Interval time.Duration // How often to run cleanup (default: 1h)
BatchSize int // Max entities to delete per batch (default: 1000)
BatchDelay time.Duration // Delay between batches to avoid blocking writes (default: 100ms)
}
RetentionConfig holds configuration for the retention worker.
func DefaultRetentionConfig ¶
func DefaultRetentionConfig() RetentionConfig
DefaultRetentionConfig returns the default retention configuration.
type RetentionWorker ¶
type RetentionWorker struct {
// contains filtered or unexported fields
}
RetentionWorker runs periodic cleanup of expired entities.
func NewRetentionWorker ¶
func NewRetentionWorker(store *EntityStore, registry *index.Registry, config RetentionConfig) *RetentionWorker
NewRetentionWorker creates a new retention worker.
func (*RetentionWorker) Start ¶
func (w *RetentionWorker) Start()
Start begins the background cleanup loop.