oshodi

package module
v0.0.0-...-47906c9 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Mar 31, 2026 License: MIT Imports: 6 Imported by: 0

README

Oshodi

A high-performance embedded key-value store for Go, designed to work seamlessly with Agbero load balancer and Keeper.

Features

  • Blazing fast: Lock-free architecture with sharded writes and zero-allocation paths
  • Embedded: No external dependencies, runs in-process
  • Persistence: Memory-mapped files with optional WAL
  • Buckets: Namespaced collections for data isolation
  • Pub/Sub: Built-in message passing between components
  • Analytics: HyperLogLog cardinality estimation and Count-Min Sketch frequency tracking
  • Compression: Optional zstd compression for large values
  • Concurrent: Optimized for high concurrency with sharded maps

Quick Start

package main

import (
    "fmt"
    "log"
    
    "github.com/agberohq/oshodi"
)

func main() {
    // Open or create a database
    db, err := oshodi.Open("data.db")
    if err != nil {
        log.Fatal(err)
    }
    defer db.Close()
    
    // Get a bucket
    users := db.Bucket("users")
    
    // Store data
    err = users.Set([]byte("alice"), []byte(`{"name":"Alice","email":"alice@example.com"}`))
    if err != nil {
        log.Fatal(err)
    }
    
    // Retrieve data
    value, err := users.Get([]byte("alice"))
    if err != nil {
        log.Fatal(err)
    }
    
    fmt.Println(string(value))
    
    // Scan all keys
    users.Scan(func(key, value []byte) bool {
        fmt.Printf("%s: %s\n", key, value)
        return true // continue scanning
    })
}

Configuration

db, err := oshodi.Open("data.db",
    oshodi.WithShardCount(128),
    oshodi.WithCacheSize(10000),
    oshodi.WithCompression(4096, 5),  // compress values >4KB
    oshodi.WithBloomFilter(1_000_000, 0.01),
    oshodi.WithCardinality(14),        // enable key count estimation
)

Pub/Sub Example

// Subscribe to a channel
unsub := db.Subscribe("events", func(payload []byte) {
    fmt.Printf("Received: %s\n", payload)
})
defer unsub()

// Publish to a channel
db.Publish("events", []byte("hello world"))

Bucket-Scoped Operations

bucket := db.Bucket("mybucket")

// Register a handler
bucket.RegisterHandler("echo", func(req []byte) ([]byte, error) {
    return req, nil
})

// Call the handler
response, _ := bucket.Call("echo", []byte("ping"))

Performance

Oshodi is optimized for:

  • Low latency: Lock-free data structures throughout
  • High throughput: Sharded writers, batch operations
  • Memory efficiency: Memory-mapped files, configurable cache
  • Almost Zero allocations: Hot paths avoid heap allocations

License

MIT

Author

Agbero HQ

Documentation

Index

Constants

This section is empty.

Variables

View Source
var (
	ErrClosed   = errors.New("oshodi: database closed")
	ErrKeyEmpty = errors.New("oshodi: key cannot be empty")
)

Functions

This section is empty.

Types

type Bucket

type Bucket struct {
	// contains filtered or unexported fields
}

Bucket represents a namespaced collection

func (*Bucket) Call

func (b *Bucket) Call(name string, payload []byte) ([]byte, error)

Call invokes a bucket-scoped handler

func (*Bucket) CallDirect

func (b *Bucket) CallDirect(name string, payload []byte) ([]byte, error)

CallDirect invokes a bucket-scoped handler in current goroutine

func (*Bucket) Delete

func (b *Bucket) Delete(key []byte) error

Delete removes a key

func (*Bucket) EstimatedKeyCount

func (b *Bucket) EstimatedKeyCount() uint64

EstimatedKeyCount returns estimated unique keys in this bucket

func (*Bucket) Get

func (b *Bucket) Get(key []byte) ([]byte, error)

Get retrieves a value by key

func (*Bucket) GetAll

func (b *Bucket) GetAll() ([]KV, error)

GetAll returns all key-value pairs in the bucket

func (*Bucket) GetAllSubscriptionIDs

func (b *Bucket) GetAllSubscriptionIDs(channel string) []int64

GetAllSubscriptionIDs returns all subscription IDs on a channel

func (*Bucket) GetKeyFrequency

func (b *Bucket) GetKeyFrequency(key []byte) uint64

GetKeyFrequency returns estimated frequency of a key

func (*Bucket) GetSubscriptionCount

func (b *Bucket) GetSubscriptionCount(channel string) int

GetSubscriptionCount returns number of subscribers on a channel

func (*Bucket) Has

func (b *Bucket) Has(key []byte) bool

Has checks if a key exists

func (*Bucket) Publish

func (b *Bucket) Publish(channel string, payload []byte) (int, error)

Publish sends a message to a channel

func (*Bucket) RegisterHandler

func (b *Bucket) RegisterHandler(name string, fn func([]byte) ([]byte, error))

RegisterHandler registers a bucket-scoped handler

func (*Bucket) Scan

func (b *Bucket) Scan(fn func(key, value []byte) bool) error

Scan iterates over all key-value pairs without copying

func (*Bucket) Set

func (b *Bucket) Set(key, value []byte) error

Set stores a key-value pair

func (*Bucket) Subscribe

func (b *Bucket) Subscribe(channel string, fn func([]byte)) func()

Subscribe registers a callback for a channel

func (*Bucket) SubscribeWithID

func (b *Bucket) SubscribeWithID(channel string, fn func([]byte)) (int64, func())

SubscribeWithID registers a callback and returns subscription ID

func (*Bucket) UnregisterHandler

func (b *Bucket) UnregisterHandler(name string)

UnregisterHandler removes a bucket-scoped handler

func (*Bucket) UnsubscribeAll

func (b *Bucket) UnsubscribeAll(channel string)

UnsubscribeAll removes all subscribers from a channel

type Config

type Config struct {
	// Basic settings
	Path string

	// Performance tuning
	ShardCount       int
	QueueCapacity    int
	WriterBufferSize int
	InitialFileSize  int64
	CacheSize        int
	PoolSize         int

	// Batching
	BatchMinSize int
	BatchMaxSize int
	BatchTimeout time.Duration
	FlushTimeout time.Duration

	// Bloom filter
	BloomExpectedItems     uint
	BloomFalsePositiveRate float64

	// Compression
	EnableCompression    bool
	CompressionThreshold int
	CompressionLevel     int

	// Compaction
	CompactionMinItems      int
	CompactionFragmentation float64
	CompactionInterval      time.Duration

	// Analytics
	EnableCardinality    bool
	CardinalityPrecision uint8 // 10-16, higher = more accurate
	EnableFrequency      bool  // Enable Count-Min Sketch

	// Logging
	Logger       *ll.Logger
	JackPool     *jack.Pool
	JackDoctor   *jack.Doctor
	JackLifetime *jack.Lifetime

	WALMaxBufSize int
	DisableWAL    bool
}

Config holds database configuration

func DefaultConfig

func DefaultConfig() *Config

DefaultConfig returns a config with sensible defaults

type DB

type DB struct {
	// contains filtered or unexported fields
}

DB is the main database handle

func Open

func Open(path string, opts ...Option) (*DB, error)

Open creates or opens a database

func (*DB) Bucket

func (db *DB) Bucket(name string) *Bucket

Bucket returns a handle to a named bucket

func (*DB) Call

func (db *DB) Call(name string, payload []byte) ([]byte, error)

Call invokes a handler with timeout protection

func (*DB) CallDirect

func (db *DB) CallDirect(name string, payload []byte) ([]byte, error)

CallDirect invokes a handler in current goroutine

func (*DB) Close

func (db *DB) Close() error

Close gracefully shuts down the database

func (*DB) Compact

func (db *DB) Compact() error

Compact triggers manual compaction

func (*DB) CompactIfNeeded

func (db *DB) CompactIfNeeded() error

CompactIfNeeded checks and compacts if thresholds are met

func (*DB) EstimatedKeyCount

func (db *DB) EstimatedKeyCount() uint64

EstimatedKeyCount returns estimated unique keys in the database

func (*DB) Flush

func (db *DB) Flush() error

Flush forces all pending writes to disk

func (*DB) GetAllSubscriptionIDs

func (db *DB) GetAllSubscriptionIDs(channel string) []int64

GetAllSubscriptionIDs returns all subscription IDs on a channel

func (*DB) GetSubscriptionCount

func (db *DB) GetSubscriptionCount(channel string) int

GetSubscriptionCount returns number of subscribers on a channel

func (*DB) Publish

func (db *DB) Publish(channel string, payload []byte) (int, error)

Publish sends a message to a channel

func (*DB) RegisterHandler

func (db *DB) RegisterHandler(name string, fn func([]byte) ([]byte, error))

RegisterHandler registers a request-response handler

func (*DB) Size

func (db *DB) Size() int64

Size returns the current logical size of the database

func (*DB) Stats

func (db *DB) Stats() map[string]interface{}

Stats returns database statistics

func (*DB) Subscribe

func (db *DB) Subscribe(channel string, fn func([]byte)) func()

Subscribe registers a callback for a channel

func (*DB) SubscribeWithID

func (db *DB) SubscribeWithID(channel string, fn func([]byte)) (int64, func())

SubscribeWithID registers a callback and returns subscription ID

func (*DB) UnregisterHandler

func (db *DB) UnregisterHandler(name string)

UnregisterHandler removes a request-response handler

func (*DB) UnsubscribeAll

func (db *DB) UnsubscribeAll(channel string)

UnsubscribeAll removes all subscribers from a channel

type KV

type KV struct {
	Key   []byte
	Value []byte
}

KV represents a key-value pair

type Option

type Option func(*Config)

Option is a functional option for configuring the database

func WithBatchSettings

func WithBatchSettings(minSize, maxSize int, timeout time.Duration) Option

WithBatchSettings configures batching behavior

func WithBloomFilter

func WithBloomFilter(expectedItems uint, falsePositiveRate float64) Option

WithBloomFilter configures the Bloom filter

func WithCacheSize

func WithCacheSize(size int) Option

WithCacheSize sets the hot cache size

func WithCardinality

func WithCardinality(precision uint8) Option

WithCardinality enables cardinality tracking with HyperLogLog

func WithCompaction

func WithCompaction(minItems int, fragmentation float64, interval time.Duration) Option

WithCompaction configures compaction thresholds

func WithCompression

func WithCompression(threshold int, level int) Option

WithCompression enables compression

func WithDoctor

func WithDoctor(doctor *jack.Doctor) Option

WithDoctor injects an existing health monitor

func WithFlushTimeout

func WithFlushTimeout(timeout time.Duration) Option

WithFlushTimeout sets the flush timeout

func WithFrequency

func WithFrequency(enable bool) Option

WithFrequency enables frequency tracking with Count-Min Sketch

func WithInitialSize

func WithInitialSize(size int64) Option

WithInitialSize sets the initial file size

func WithJackPool

func WithJackPool(pool *jack.Pool) Option

WithJackPool injects an existing worker pool so the DB doesn't spawn its own

func WithLifetime

func WithLifetime(lifetime *jack.Lifetime) Option

WithLifetime injects an existing lifecycle manager for TTLs and background tasks

func WithLogger

func WithLogger(logger *ll.Logger) Option

WithLogger sets the logger

func WithPoolSize

func WithPoolSize(size int) Option

WithPoolSize sets the worker pool size

func WithQueueCapacity

func WithQueueCapacity(cap int) Option

WithQueueCapacity sets the write queue capacity

func WithShardCount

func WithShardCount(count int) Option

WithShardCount sets the number of shards for concurrency

func WithWALBufferSize

func WithWALBufferSize(size int) Option

WithWALBufferSize options.go - Add WAL options:

func WithWALDisabled

func WithWALDisabled(disable bool) Option

WithWALDisabled disables the Write-Ahead Log

func WithWriterBufferSize

func WithWriterBufferSize(size int) Option

WithWriterBufferSize sets the writer buffer size

Directories

Path Synopsis
internal

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL