engine

package
v0.0.0-...-47906c9 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Mar 31, 2026 License: MIT Imports: 16 Imported by: 0

Documentation

Index

Constants

This section is empty.

Variables

View Source
var (
	ErrKeyNotFound = errors.New("oshodi: key not found")
	ErrBufferFull  = errors.New("oshodi: write buffer full")
	ErrKeyEmpty    = errors.New("oshodi: key cannot be empty")
)

Functions

This section is empty.

Types

type Bucket

type Bucket struct {
	// contains filtered or unexported fields
}

Bucket represents a namespaced collection

func (*Bucket) BatchSet

func (b *Bucket) BatchSet(entries []KV) error

BatchSet stores multiple key-value pairs efficiently in a single lock-free syscall.

func (*Bucket) Call

func (b *Bucket) Call(name string, payload []byte) ([]byte, error)

Call invokes a bucket-scoped handler

func (*Bucket) CallDirect

func (b *Bucket) CallDirect(name string, payload []byte) ([]byte, error)

CallDirect invokes a bucket-scoped handler directly

func (*Bucket) Delete

func (b *Bucket) Delete(key []byte) error

Delete removes a key

func (*Bucket) EstimatedKeyCount

func (b *Bucket) EstimatedKeyCount() uint64

EstimatedKeyCount returns estimated unique keys in this bucket

func (*Bucket) Get

func (b *Bucket) Get(key []byte) ([]byte, error)

Get retrieves a value

func (*Bucket) GetAll

func (b *Bucket) GetAll() ([]KV, error)

GetAll returns all key-value pairs in the bucket

func (*Bucket) GetAllSubscriptionIDs

func (b *Bucket) GetAllSubscriptionIDs(channel string) []int64

GetAllSubscriptionIDs returns all subscription IDs on a channel

func (*Bucket) GetKeyFrequency

func (b *Bucket) GetKeyFrequency(key []byte) uint64

GetKeyFrequency returns estimated frequency of a key

func (*Bucket) GetSubscriptionCount

func (b *Bucket) GetSubscriptionCount(channel string) int

GetSubscriptionCount returns number of subscribers on a channel

func (*Bucket) Publish

func (b *Bucket) Publish(channel string, payload []byte) (int, error)

Publish sends a message to a channel

func (*Bucket) RegisterHandler

func (b *Bucket) RegisterHandler(name string, fn func([]byte) ([]byte, error))

RegisterHandler registers a bucket-scoped handler

func (*Bucket) Scan

func (b *Bucket) Scan(fn func(key, value []byte) bool) error

Scan iterates over all key-value pairs without copying

func (*Bucket) Set

func (b *Bucket) Set(key, value []byte) error

Set stores a key-value pair

func (*Bucket) Subscribe

func (b *Bucket) Subscribe(channel string, fn func([]byte)) func()

Subscribe registers a callback for a channel

func (*Bucket) SubscribeWithID

func (b *Bucket) SubscribeWithID(channel string, fn func([]byte)) (int64, func())

SubscribeWithID registers a callback and returns subscription ID

func (*Bucket) UnregisterHandler

func (b *Bucket) UnregisterHandler(name string)

UnregisterHandler removes a bucket-scoped handler

func (*Bucket) UnsubscribeAll

func (b *Bucket) UnsubscribeAll(channel string)

UnsubscribeAll removes all subscribers from a channel

type Config

type Config struct {
	FilePath                string
	BloomExpectedItems      uint
	BloomFalsePositiveRate  float64
	Logger                  *ll.Logger
	BatchMinSize            int
	BatchMaxSize            int
	BatchTimeout            time.Duration
	FlushTimeout            time.Duration
	SetQueueCapacity        int
	WriterBufferSize        int
	InitialFileSize         int64
	CompactionMinItems      int
	CompactionFragmentation float64
	CompactionInterval      time.Duration
	PoolSize                int
	EnableCompression       bool
	CompressionThreshold    int
	CompressionLevel        int
	CacheSize               int
	EnableCardinality       bool
	CardinalityPrecision    uint8
	EnableFrequency         bool
	SplitMutexShards        int
	JackPool                *jack.Pool
	JackDoctor              *jack.Doctor
	JackLifetime            *jack.Lifetime
	WALMaxBufSize           int
	DisableWAL              bool
}

type DB

type DB struct {
	// contains filtered or unexported fields
}

func NewDB

func NewDB(config *Config) (*DB, error)

NewDB creates a database instance with the given configuration. It initialises storage, index, writer, cache, metrics, and WAL. If the WAL is enabled and contains data, it replays unflushed writes.

func (*DB) BatchSet

func (db *DB) BatchSet(entries []KV) error

BatchSet stores multiple key-value pairs efficiently. It writes all entries to the WAL (if enabled), then performs a single batch write to main storage under a read-lock, updating the index and cache.

func (*DB) Call

func (db *DB) Call(name string, payload []byte) ([]byte, error)

Call invokes a registered handler with a timeout. It returns the handler's response or an error on timeout.

func (*DB) CallDirect

func (db *DB) CallDirect(name string, payload []byte) ([]byte, error)

CallDirect invokes a registered handler without timeout protection. It runs the handler in the caller's goroutine.

func (*DB) Close

func (db *DB) Close() error

Close shuts down the database gracefully. It flushes pending writes, closes storage, and waits for background tasks.

func (*DB) Compact

func (db *DB) Compact() error

Compact rewrites the storage file, removing deleted records and reducing fragmentation. It creates a temporary file, copies all live records without holding any locks, then takes the write-commit lock to drain in-flight writers, copies any records written during the first pass, atomically swaps the storage and index pointers, and renames the temp file synchronously.

func (*DB) CompactIfNeeded

func (db *DB) CompactIfNeeded() error

CompactIfNeeded checks whether fragmentation exceeds the configured threshold. If compaction is required, it triggers a full compaction.

func (*DB) Delete

func (db *DB) Delete(key []byte) error

Delete removes a key. It writes a tombstone to the WAL and main storage, then removes the entry from the index and cache.

func (*DB) EstimatedKeyCount

func (db *DB) EstimatedKeyCount() uint64

EstimatedKeyCount returns the approximate number of unique keys. It uses HyperLogLog if cardinality tracking is enabled.

func (*DB) Flush

func (db *DB) Flush() error

Flush forces all pending writes to disk and rotates the WAL. It syncs main storage first, then closes the old WAL, removes the file, and creates a fresh WAL for future writes. The sync happens before WAL deletion so a crash between the two leaves a recoverable WAL on disk.

func (*DB) Get

func (db *DB) Get(key []byte) ([]byte, error)

Get retrieves the value for a key. It checks the cache first, then the Bloom filter, the index, and finally reads from storage.

func (*DB) GetAllSubscriptionIDs

func (db *DB) GetAllSubscriptionIDs(channel string) []int64

GetAllSubscriptionIDs returns all subscription IDs on a channel.

func (*DB) GetBucket

func (db *DB) GetBucket(name string) *Bucket

GetBucket returns a namespaced bucket handle. Buckets isolate keys and provide their own set of methods.

func (*DB) GetSubscriptionCount

func (db *DB) GetSubscriptionCount(channel string) int

GetSubscriptionCount returns the number of subscribers on a channel.

func (*DB) Info

func (db *DB) Info() map[string]interface{}

Info returns a map of database statistics for external consumption. It is a convenient wrapper around Stats.

func (*DB) Publish

func (db *DB) Publish(channel string, payload []byte) (int, error)

Publish sends a payload to all subscribers of a channel. It returns the number of subscribers that received the message.

func (*DB) RegisterHandler

func (db *DB) RegisterHandler(name string, fn func([]byte) ([]byte, error))

RegisterHandler binds a name to a request‑response handler. The handler will be invoked when Call is used with the same name.

func (*DB) Set

func (db *DB) Set(key, value []byte) error

Set stores a key-value pair. It writes to the WAL first if enabled, then to the main storage under a read-lock that blocks during compaction swaps, and finally updates the index and cache.

func (*DB) Size

func (db *DB) Size() int64

Size returns the current logical size of the database file.

func (*DB) Stats

func (db *DB) Stats() DBStats

Stats returns a snapshot of database statistics. It includes key count, file size, Bloom filter metrics, cache size, and estimated keys.

func (*DB) Subscribe

func (db *DB) Subscribe(channel string, fn func([]byte)) func()

Subscribe registers a callback for a channel. It returns an unsubscribe function to remove the subscription.

func (*DB) SubscribeWithID

func (db *DB) SubscribeWithID(channel string, fn func([]byte)) (int64, func())

SubscribeWithID registers a callback and returns a subscription ID. It returns both the ID and an unsubscribe function.

func (*DB) UnregisterHandler

func (db *DB) UnregisterHandler(name string)

UnregisterHandler removes a previously registered handler.

func (*DB) UnsubscribeAll

func (db *DB) UnsubscribeAll(channel string)

UnsubscribeAll removes all subscribers from a channel.

type DBStats

type DBStats struct {
	NumKeys         int
	FileSizeBytes   int64
	LogicalOffset   int64
	BloomInsertions uint64
	BloomFPR        float64
	CacheSize       int
	EstimatedKeys   uint64
}

type Index

type Index struct {
	// contains filtered or unexported fields
}

Index manages key to offset mapping using a lock-free sharded map. It provides O(1) lookups with no mutex contention.

func NewIndex

func NewIndex(expectedItems uint, falsePositiveRate float64) *Index

NewIndex creates a new index with the given Bloom filter parameters.

func (*Index) BloomFPR

func (idx *Index) BloomFPR() float64

BloomFPR returns the current false positive rate of the Bloom filter.

func (*Index) BloomInsertions

func (idx *Index) BloomInsertions() uint64

BloomInsertions returns the number of insertions into the Bloom filter.

func (*Index) Delete

func (idx *Index) Delete(key []byte)

Delete removes a key from the index.

func (*Index) Get

func (idx *Index) Get(key []byte) (int64, bool)

Get returns the offset for a key. The boolean indicates existence.

func (*Index) Len

func (idx *Index) Len() int

Len returns the total number of keys.

func (*Index) MaybeHas

func (idx *Index) MaybeHas(key []byte) bool

MaybeHas checks the Bloom filter for potential existence.

func (*Index) Range

func (idx *Index) Range(fn func(key []byte, offset int64) bool)

Range iterates over all keys in arbitrary order. The function receives a copy of the key and its offset. Return false from the callback to stop iteration.

func (*Index) Set

func (idx *Index) Set(key []byte, offset int64)

Set inserts or updates a key with its storage offset.

type KV

type KV struct {
	Key   []byte
	Value []byte
}

KV represents a key-value pair

type ShardedWriter

type ShardedWriter struct {
	// contains filtered or unexported fields
}

ShardedWriter parallelizes CPU-bound encoding (compression, formatting) across shards. It relies on the lock-free Offset Reservation Pattern in storage.File for actual disk writes.

func NewShardedWriter

func NewShardedWriter(cfg ShardedWriterConfig) (*ShardedWriter, error)

NewShardedWriter creates a highly optimized ShardedWriter.

func (*ShardedWriter) BatchWriteRecord

func (sw *ShardedWriter) BatchWriteRecord(entries []KV) ([]int64, error)

BatchWriteRecord heavily optimizes throughput by merging multiple records into a single WriteAtomic OS Syscall.

func (*ShardedWriter) Close

func (sw *ShardedWriter) Close() error

Close flushes data and closes the writer.

func (*ShardedWriter) Flush

func (sw *ShardedWriter) Flush() error

Flush ensures the OS page cache is synced to the underlying disk.

func (*ShardedWriter) Offset

func (sw *ShardedWriter) Offset() int64

Offset returns the exact current logical file offset from the storage file.

func (*ShardedWriter) Sync

func (sw *ShardedWriter) Sync() error

Sync is an alias for Flush to satisfy API requirements.

func (*ShardedWriter) WriteRecord

func (sw *ShardedWriter) WriteRecord(key, value []byte) (int64, error)

WriteRecord performs zero-allocation encoding and lock-free offset reservation.

func (*ShardedWriter) WriteTombstone

func (sw *ShardedWriter) WriteTombstone(key []byte) error

WriteTombstone creates a deletion marker lock-free and appends it to storage.

type ShardedWriterConfig

type ShardedWriterConfig struct {
	ShardCount int
	BufferSize int
	Pool       *jack.Pool
	Store      *storage.File
}

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL