Documentation
¶
Index ¶
- Variables
- type Bucket
- func (b *Bucket) BatchSet(entries []KV) error
- func (b *Bucket) Call(name string, payload []byte) ([]byte, error)
- func (b *Bucket) CallDirect(name string, payload []byte) ([]byte, error)
- func (b *Bucket) Delete(key []byte) error
- func (b *Bucket) EstimatedKeyCount() uint64
- func (b *Bucket) Get(key []byte) ([]byte, error)
- func (b *Bucket) GetAll() ([]KV, error)
- func (b *Bucket) GetAllSubscriptionIDs(channel string) []int64
- func (b *Bucket) GetKeyFrequency(key []byte) uint64
- func (b *Bucket) GetSubscriptionCount(channel string) int
- func (b *Bucket) Publish(channel string, payload []byte) (int, error)
- func (b *Bucket) RegisterHandler(name string, fn func([]byte) ([]byte, error))
- func (b *Bucket) Scan(fn func(key, value []byte) bool) error
- func (b *Bucket) Set(key, value []byte) error
- func (b *Bucket) Subscribe(channel string, fn func([]byte)) func()
- func (b *Bucket) SubscribeWithID(channel string, fn func([]byte)) (int64, func())
- func (b *Bucket) UnregisterHandler(name string)
- func (b *Bucket) UnsubscribeAll(channel string)
- type Config
- type DB
- func (db *DB) BatchSet(entries []KV) error
- func (db *DB) Call(name string, payload []byte) ([]byte, error)
- func (db *DB) CallDirect(name string, payload []byte) ([]byte, error)
- func (db *DB) Close() error
- func (db *DB) Compact() error
- func (db *DB) CompactIfNeeded() error
- func (db *DB) Delete(key []byte) error
- func (db *DB) EstimatedKeyCount() uint64
- func (db *DB) Flush() error
- func (db *DB) Get(key []byte) ([]byte, error)
- func (db *DB) GetAllSubscriptionIDs(channel string) []int64
- func (db *DB) GetBucket(name string) *Bucket
- func (db *DB) GetSubscriptionCount(channel string) int
- func (db *DB) Info() map[string]interface{}
- func (db *DB) Publish(channel string, payload []byte) (int, error)
- func (db *DB) RegisterHandler(name string, fn func([]byte) ([]byte, error))
- func (db *DB) Set(key, value []byte) error
- func (db *DB) Size() int64
- func (db *DB) Stats() DBStats
- func (db *DB) Subscribe(channel string, fn func([]byte)) func()
- func (db *DB) SubscribeWithID(channel string, fn func([]byte)) (int64, func())
- func (db *DB) UnregisterHandler(name string)
- func (db *DB) UnsubscribeAll(channel string)
- type DBStats
- type Index
- func (idx *Index) BloomFPR() float64
- func (idx *Index) BloomInsertions() uint64
- func (idx *Index) Delete(key []byte)
- func (idx *Index) Get(key []byte) (int64, bool)
- func (idx *Index) Len() int
- func (idx *Index) MaybeHas(key []byte) bool
- func (idx *Index) Range(fn func(key []byte, offset int64) bool)
- func (idx *Index) Set(key []byte, offset int64)
- type KV
- type ShardedWriter
- func (sw *ShardedWriter) BatchWriteRecord(entries []KV) ([]int64, error)
- func (sw *ShardedWriter) Close() error
- func (sw *ShardedWriter) Flush() error
- func (sw *ShardedWriter) Offset() int64
- func (sw *ShardedWriter) Sync() error
- func (sw *ShardedWriter) WriteRecord(key, value []byte) (int64, error)
- func (sw *ShardedWriter) WriteTombstone(key []byte) error
- type ShardedWriterConfig
Constants ¶
This section is empty.
Variables ¶
Functions ¶
This section is empty.
Types ¶
type Bucket ¶
type Bucket struct {
// contains filtered or unexported fields
}
Bucket represents a namespaced collection
func (*Bucket) BatchSet ¶
BatchSet stores multiple key-value pairs efficiently in a single lock-free syscall.
func (*Bucket) CallDirect ¶
CallDirect invokes a bucket-scoped handler directly
func (*Bucket) EstimatedKeyCount ¶
EstimatedKeyCount returns estimated unique keys in this bucket
func (*Bucket) GetAllSubscriptionIDs ¶
GetAllSubscriptionIDs returns all subscription IDs on a channel
func (*Bucket) GetKeyFrequency ¶
GetKeyFrequency returns estimated frequency of a key
func (*Bucket) GetSubscriptionCount ¶
GetSubscriptionCount returns number of subscribers on a channel
func (*Bucket) RegisterHandler ¶
RegisterHandler registers a bucket-scoped handler
func (*Bucket) SubscribeWithID ¶
SubscribeWithID registers a callback and returns subscription ID
func (*Bucket) UnregisterHandler ¶
UnregisterHandler removes a bucket-scoped handler
func (*Bucket) UnsubscribeAll ¶
UnsubscribeAll removes all subscribers from a channel
type Config ¶
type Config struct {
FilePath string
BloomExpectedItems uint
BloomFalsePositiveRate float64
Logger *ll.Logger
BatchMinSize int
BatchMaxSize int
BatchTimeout time.Duration
FlushTimeout time.Duration
SetQueueCapacity int
WriterBufferSize int
InitialFileSize int64
CompactionMinItems int
CompactionFragmentation float64
CompactionInterval time.Duration
PoolSize int
EnableCompression bool
CompressionThreshold int
CompressionLevel int
CacheSize int
EnableCardinality bool
CardinalityPrecision uint8
EnableFrequency bool
SplitMutexShards int
JackPool *jack.Pool
JackDoctor *jack.Doctor
JackLifetime *jack.Lifetime
WALMaxBufSize int
DisableWAL bool
}
type DB ¶
type DB struct {
// contains filtered or unexported fields
}
func NewDB ¶
NewDB creates a database instance with the given configuration. It initialises storage, index, writer, cache, metrics, and WAL. If the WAL is enabled and contains data, it replays unflushed writes.
func (*DB) BatchSet ¶
BatchSet stores multiple key-value pairs efficiently. It writes all entries to the WAL (if enabled), then performs a single batch write to main storage under a read-lock, updating the index and cache.
func (*DB) Call ¶
Call invokes a registered handler with a timeout. It returns the handler's response or an error on timeout.
func (*DB) CallDirect ¶
CallDirect invokes a registered handler without timeout protection. It runs the handler in the caller's goroutine.
func (*DB) Close ¶
Close shuts down the database gracefully. It flushes pending writes, closes storage, and waits for background tasks.
func (*DB) Compact ¶
Compact rewrites the storage file, removing deleted records and reducing fragmentation. It creates a temporary file, copies all live records without holding any locks, then takes the write-commit lock to drain in-flight writers, copies any records written during the first pass, atomically swaps the storage and index pointers, and renames the temp file synchronously.
func (*DB) CompactIfNeeded ¶
CompactIfNeeded checks whether fragmentation exceeds the configured threshold. If compaction is required, it triggers a full compaction.
func (*DB) Delete ¶
Delete removes a key. It writes a tombstone to the WAL and main storage, then removes the entry from the index and cache.
func (*DB) EstimatedKeyCount ¶
EstimatedKeyCount returns the approximate number of unique keys. It uses HyperLogLog if cardinality tracking is enabled.
func (*DB) Flush ¶
Flush forces all pending writes to disk and rotates the WAL. It syncs main storage first, then closes the old WAL, removes the file, and creates a fresh WAL for future writes. The sync happens before WAL deletion so a crash between the two leaves a recoverable WAL on disk.
func (*DB) Get ¶
Get retrieves the value for a key. It checks the cache first, then the Bloom filter, the index, and finally reads from storage.
func (*DB) GetAllSubscriptionIDs ¶
GetAllSubscriptionIDs returns all subscription IDs on a channel.
func (*DB) GetBucket ¶
GetBucket returns a namespaced bucket handle. Buckets isolate keys and provide their own set of methods.
func (*DB) GetSubscriptionCount ¶
GetSubscriptionCount returns the number of subscribers on a channel.
func (*DB) Info ¶
Info returns a map of database statistics for external consumption. It is a convenient wrapper around Stats.
func (*DB) Publish ¶
Publish sends a payload to all subscribers of a channel. It returns the number of subscribers that received the message.
func (*DB) RegisterHandler ¶
RegisterHandler binds a name to a request‑response handler. The handler will be invoked when Call is used with the same name.
func (*DB) Set ¶
Set stores a key-value pair. It writes to the WAL first if enabled, then to the main storage under a read-lock that blocks during compaction swaps, and finally updates the index and cache.
func (*DB) Stats ¶
Stats returns a snapshot of database statistics. It includes key count, file size, Bloom filter metrics, cache size, and estimated keys.
func (*DB) Subscribe ¶
Subscribe registers a callback for a channel. It returns an unsubscribe function to remove the subscription.
func (*DB) SubscribeWithID ¶
SubscribeWithID registers a callback and returns a subscription ID. It returns both the ID and an unsubscribe function.
func (*DB) UnregisterHandler ¶
UnregisterHandler removes a previously registered handler.
func (*DB) UnsubscribeAll ¶
UnsubscribeAll removes all subscribers from a channel.
type Index ¶
type Index struct {
// contains filtered or unexported fields
}
Index manages key to offset mapping using a lock-free sharded map. It provides O(1) lookups with no mutex contention.
func (*Index) BloomInsertions ¶
BloomInsertions returns the number of insertions into the Bloom filter.
type ShardedWriter ¶
type ShardedWriter struct {
// contains filtered or unexported fields
}
ShardedWriter parallelizes CPU-bound encoding (compression, formatting) across shards. It relies on the lock-free Offset Reservation Pattern in storage.File for actual disk writes.
func NewShardedWriter ¶
func NewShardedWriter(cfg ShardedWriterConfig) (*ShardedWriter, error)
NewShardedWriter creates a highly optimized ShardedWriter.
func (*ShardedWriter) BatchWriteRecord ¶
func (sw *ShardedWriter) BatchWriteRecord(entries []KV) ([]int64, error)
BatchWriteRecord heavily optimizes throughput by merging multiple records into a single WriteAtomic OS Syscall.
func (*ShardedWriter) Close ¶
func (sw *ShardedWriter) Close() error
Close flushes data and closes the writer.
func (*ShardedWriter) Flush ¶
func (sw *ShardedWriter) Flush() error
Flush ensures the OS page cache is synced to the underlying disk.
func (*ShardedWriter) Offset ¶
func (sw *ShardedWriter) Offset() int64
Offset returns the exact current logical file offset from the storage file.
func (*ShardedWriter) Sync ¶
func (sw *ShardedWriter) Sync() error
Sync is an alias for Flush to satisfy API requirements.
func (*ShardedWriter) WriteRecord ¶
func (sw *ShardedWriter) WriteRecord(key, value []byte) (int64, error)
WriteRecord performs zero-allocation encoding and lock-free offset reservation.
func (*ShardedWriter) WriteTombstone ¶
func (sw *ShardedWriter) WriteTombstone(key []byte) error
WriteTombstone creates a deletion marker lock-free and appends it to storage.