bw

package module
v0.2.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: May 10, 2026 License: MIT Imports: 25 Imported by: 0

README

bw

License Coverage GitHub Workflow Status Go Report Card Go PKG

bw is a thin wrapper around BadgerDB that exposes a typed bucket API, plus a query engine that consumes github.com/rakunlabs/query expressions directly. URL query strings translate into Go-side filtering, sorting and pagination over Badger key prefixes.

graph LR
    A["URL query<br/>name=A&age>30"] --> B["query.Parse<br/>*query.Query"] --> C["bw engine<br/>Badger scan"]

Install

go get github.com/rakunlabs/bw

Quick start

package main

import (
    "context"
    "log"

    "github.com/rakunlabs/bw"
    "github.com/rakunlabs/query"
)

// One tag set drives both the bw schema (pk/index/unique flags) and
// the on-wire field name. The codec honours `bw:"-"` to skip a
// field. No codegen needed.
type User struct {
    ID    string `bw:"id,pk"`
    Name  string `bw:"name,index"`
    Email string `bw:"email,unique"`
    Age   int    `bw:"age,index"`
    Bio   string `bw:"-"`             // never serialized
}

func main() {
    db, err := bw.Open("/var/lib/myapp")
    if err != nil { log.Fatal(err) }
    defer db.Close()

    users, err := bw.RegisterBucket[User](db, "users")
    if err != nil { log.Fatal(err) } // fails clearly if you forgot `go generate`

    ctx := context.Background()

    err = users.InsertMany(ctx,
      []*User{
        {ID: "1", Name: "Kemal Sunal", Email: "a@x", Age: 30},
        {ID: "2", Name: "Tarık Akan", Email: "b@x", Age: 25},
      },
    )
    if err != nil { log.Fatal(err) }

    u, _ := users.Get(ctx, "1")
    log.Println(u.Name) // Kemal Sunal

    q, _ := query.Parse("name=Tarık Akan|age[gt]=29&_sort=-age&_limit=10")
    got, _ := users.Find(ctx, q)
    log.Println(got)
}
Tag flags
ID    string `bw:"id,pk"`           // primary key
Name  string `bw:"name,index"`      // ordered index, range/sort friendly
Email string `bw:"email,unique"`    // uniqueness constraint (lookup-style)
User  string `bw:"user,index,unique"` // both: indexed AND unique (allowed)
Tag   string `bw:"-"`               // skip

pk, index and unique parse independently; combine them freely.

Composite indexes and unique constraints

Use index:groupname or unique:groupname to combine multiple fields into a single index or unique constraint. Fields sharing the same group name are concatenated in struct declaration order.

type Location struct {
    ID      string `bw:"id,pk"`
    Country string `bw:"country,index:region"`         // composite index "region"
    City    string `bw:"city,index:region"`             // same group → key is (country, city)
    Code    string `bw:"code,unique:country_code"`      // composite unique "country_code"
    Prefix  string `bw:"prefix,unique:country_code"`    // same group → (code, prefix) must be unique together
    Name    string `bw:"name,index"`                    // plain single-field index (unchanged)
}

Composite indexes are used by the query planner when all constituent fields appear as equality conditions:

country=TR&city=Istanbul  → composite index seek on "region"
country=TR                → full scan (only 1 of 2 fields supplied)
country=TR&city=Istanbul&name=foo → composite seek + residual filter on name

Composite unique constraints enforce that the combination of all grouped fields is unique across the bucket. Individual field values may repeat as long as the full tuple is distinct:

(Code="TR", Prefix="34") + (Code="TR", Prefix="06")  → OK
(Code="TR", Prefix="34") + (Code="TR", Prefix="34")  → ErrConflict (on different PKs)
Schema evolution (adding/removing fields)

When you change a struct (add new fields, add/remove indexes), use WithVersion to tell RegisterBucket to auto-migrate:

// V1 — original schema.
type User struct {
    ID   string `bw:"id,pk"`
    Name string `bw:"name,index"`
}

// V2 — added Email (unique) and Age (indexed).
type User struct {
    ID    string `bw:"id,pk"`
    Name  string `bw:"name,index"`
    Email string `bw:"email,unique"`
    Age   int    `bw:"age,index"`
}
// Bump the version number each time you change the index/unique surface.
// RegisterBucket auto-migrates when stored version < provided version.
users, err := bw.RegisterBucket[User](db, "users", bw.WithVersion[User](2))
if err != nil {
    log.Fatal(err)
}

That's it. No manual two-step MigrateBucket call needed — just bump the version number when you change the struct.

What happens under the hood (incremental):

It does NOT drop all indexes and rebuild everything. It diffs the old schema against the new one and only touches what changed:

  1. Fields that lost their index tag → only those index keys are deleted.
  2. Fields that lost their unique tag → only those unique keys are deleted.
  3. Fields that are newly indexed/unique → scans data and builds entries only for those fields.
  4. Fields whose flags are unchanged → left completely alone (no I/O).
  5. Updates the stored fingerprint, version, and manifest.

Rules:

  • Additive changes (new fields) are safe — old records get zero values.
  • Removing an index is safe — the stale index keys are cleaned up.
  • Changing the primary key field or its bw tag name requires a manual data migration (you'd need to re-key every record).
  • Zero-value unique fields (empty string, nil slice) are skipped during migration to avoid false conflicts on old records that lack the new field.
  • If you don't provide WithVersion, the old strict behavior applies (fingerprint mismatch = error). You can still call MigrateBucket explicitly in that case.
Defaults
bw.DefaultCacheSize int64 = 100 << 20   // 100 MiB block cache (Badger default is 256 MiB)
bw.DefaultLogSize   int64 = 100 << 20   // 100 MiB value-log file size (Badger default is 1 GiB)

These are package-level vars. Either change them at process start or use WithBadgerOptions to take full control.


Query syntax

bw consumes whatever query.Parse produces, so the operator set is identical to the upstream package:

Operator Meaning Example
eq (default) equal name=Alice
ne not equal name[ne]=Alice
gt, gte, lt, lte numeric / lexicographic comparison age[gte]=18
like, ilike SQL-style %/_ wildcards (i = case-insensitive) name[like]=A%25 (URL-encode %)
nlike, nilike negated LIKE / ILIKE
in (implicit on ,) membership country=US,DE,FR
nin not in status[nin]=banned,deleted
is, not IS NULL / IS NOT NULL deleted_at[is]=
kv JSONB-style containment meta[kv]=eyJhIjoxfQ
jin, njin array has any / none tags[jin]=admin,editor

Logical: & for AND, | for OR, () to group. Pagination: _sort=field,-other, _limit=N, _offset=N. Projection: _fields=id,name (returned as Query.Select).

Dot-paths work for nested values and slice indexing:

address.city=Berlin
items.0.name[like]=foo%25

Backup & restore

Every *bw.DB exposes backup, restore and version methods backed by Badger's streaming backup format.

// Current database version (monotonically increasing uint64).
ver := db.Version()

// Full backup.
var buf bytes.Buffer
since, _ := db.Backup(&buf, 0, false)

// Incremental backup (only entries newer than `since`).
db.Backup(&buf, since, false)

// Backup with deleted data (preserves delete markers for point-in-time).
db.Backup(&buf, 0, true)

// Point-in-time backup: only entries with version <= savedVersion.
db.BackupUntil(&buf, savedVersion)

// Restore into a (typically fresh) database.
db2.Restore(&buf)
Method Description
Backup(w, since, deletedData) Incremental backup; set deletedData=true to preserve delete markers
BackupUntil(w, until) Point-in-time backup up to a given version
Restore(r) Load a backup into the database
Version() Current max transaction version

Cluster mode

The cluster sub-package adds multi-node replication on top of bw using alan for UDP peer discovery and leader election.

go get github.com/rakunlabs/bw/cluster
How it works
graph LR
    F1["Follower<br/>(read-only)<br/>local read"] -- "write" --> L["Leader<br/>(read-write)<br/>backup diff"]
    L -- "Push (stream)" --> F2["Follower<br/>(read-only)<br/>local read"]
    L -- "Push (stream)" --> F1
    F1 -- "PullReq" --> L
  • Leader election: alan's distributed lock (LeaderLoop). If the leader crashes, another node acquires the lock automatically.
  • Writes: only the leader writes. The application routes writes via IsLeader() check and its own transport, or uses the built-in Forward() helper (see below).
  • Reads: always local. Every node serves reads from its own database.
  • Sync after write: leader calls NotifySync(ctx), which pushes the incremental diff to every behind follower over a QUIC stream and blocks until each one has finished restoring it. Stream completion is the acknowledgement, so when NotifySync returns nil all reachable followers are caught up.
  • Periodic sync: followers pull from the leader every N minutes (default 5) as a safety net for missed pushes.
  • Leader catch-up: a newly elected leader asks all peers for their version and pulls the diff from whichever peer is furthest ahead.
  • Stream transfer: diff data uses alan's SendToStream / HandleStream, so there is no MaxMessageSize cap and the receiver pipes the body directly into bw.DB.Restore.
Usage
package main

import (
    "context"
    "log"
    "time"

    "github.com/rakunlabs/alan"
    "github.com/rakunlabs/bw"
    "github.com/rakunlabs/bw/cluster"
)

type User struct {
    ID   string `bw:"id,pk"`
    Name string `bw:"name,index"`
}

func main() {
    ctx, cancel := context.WithCancel(context.Background())
    defer cancel()

    // 1. Open the database.
    db, err := bw.Open("/var/lib/myapp")
    if err != nil { log.Fatal(err) }
    defer db.Close()

    // 2. Create an alan instance (do NOT call Start yourself).
    a, err := alan.New(alan.Config{
        DNSAddr:  "myapp-headless.default.svc.cluster.local",
        Port:     7946,
        Replicas: 3,
    })
    if err != nil { log.Fatal(err) }

    // 3. Create and start the cluster.
    c := cluster.New(db, a,
        cluster.WithSyncInterval(5*time.Minute),
        cluster.WithLockKey("myapp-leader"),
        cluster.WithOnLeaderChange(func(isLeader bool) {
            log.Println("leader:", isLeader)
        }),
    )
    if err := c.Start(ctx); err != nil { log.Fatal(err) }
    defer c.Stop()

    // 4. Register buckets as usual.
    users, _ := bw.RegisterBucket[User](db, "users")

    // 5. Reads — always local.
    u, _ := users.Get(ctx, "u1")
    _ = u

    // 6. Writes — leader only. NotifySync blocks until followers
    //    have applied the diff (or ctx is cancelled).
    if c.IsLeader() {
        _ = users.Insert(ctx, &User{ID: "u1", Name: "Elif"})
        if err := c.NotifySync(ctx); err != nil {
            log.Printf("notify sync: %v", err)
        }
    }
}
Options
Option Default Description
WithLockKey(key) "bw-leader" Distributed lock name for leader election
WithSyncInterval(d) 5m How often followers poll the leader
WithOnLeaderChange(fn) nil Callback when leadership changes
WithPrefix(s) "bw" Message namespace prefix (see below)
WithForwardHandler(fn) nil Handler for forwarded requests on the leader
Message prefix

If the same alan instance is shared by multiple subsystems (e.g. your app uses alan for its own protocol alongside bw/cluster), messages can collide. Every cluster message is prefixed with a namespace string (default "bw"). Messages without the expected prefix are silently ignored.

// Two independent clusters on the same alan instance:
c1 := cluster.New(db1, a, cluster.WithPrefix("users"))
c2 := cluster.New(db2, a, cluster.WithPrefix("orders"))
Forwarding writes to the leader

Forward sends an application-level request to the current leader over alan's request-reply and returns the response. If the calling node is already the leader, the handler runs locally without a network hop.

This eliminates the need for a separate HTTP/gRPC forwarding layer for simple or moderate-sized writes.

c := cluster.New(db, a,
    cluster.WithForwardHandler(func(ctx context.Context, data []byte) []byte {
        var req CreateUserRequest
        json.Unmarshal(data, &req)

        _ = users.Insert(ctx, &User{ID: req.ID, Name: req.Name})
        _ = c.NotifySync(ctx) // wait for followers to catch up

        resp, _ := json.Marshal(CreateUserResponse{OK: true})
        return resp
    }),
)

// In your HTTP handler (works on any node):
func (s *Server) CreateUser(w http.ResponseWriter, r *http.Request) {
    body, _ := io.ReadAll(r.Body)
    resp, err := s.cluster.Forward(r.Context(), body)
    if err != nil {
        http.Error(w, err.Error(), 502)
        return
    }
    w.Write(resp)
}

Note: Forward uses alan's QUIC-based transport, so there is no payload size limit.

Documentation

Overview

Package bw is a thin BadgerDB wrapper that provides a typed bucket API and exposes filtering through github.com/rakunlabs/query.

Usage:

db, err := bw.Open("/var/lib/foo")
defer db.Close()

users, _ := bw.RegisterBucket[User](db, "users")
_ = users.Insert(ctx, &User{ID: "1", Name: "Alice"})

q, _ := query.Parse("name=Alice&_limit=10")
got, _ := users.Find(ctx, q)

Index

Constants

This section is empty.

Variables

View Source
var (
	// ErrNotFound is returned when a key does not exist in a bucket.
	ErrNotFound = errors.New("bw: not found")
	// ErrNoPK is returned when a typed bucket operation requires a primary
	// key but the registered schema has none and no key extractor was
	// provided.
	ErrNoPK = errors.New("bw: no primary-key field")
	// ErrConflict is returned when a write would violate a unique
	// constraint (Phase 2; reserved).
	ErrConflict = errors.New("bw: unique constraint violated")
	// ErrClosed is returned when an operation is attempted on a closed DB.
	ErrClosed = errors.New("bw: database is closed")
	// ErrReadOnlyTx is returned when a mutating call is made on a
	// read-only transaction.
	ErrReadOnlyTx = errors.New("bw: read-only transaction")
	// ErrNoFTS is returned when Search is called on a bucket with no
	// FTS-tagged fields.
	ErrNoFTS = errors.New("bw: no full-text search fields configured")
	// ErrNoVector is returned when SearchVector is called on a bucket
	// with no vector-tagged field.
	ErrNoVector = errors.New("bw: no vector field configured")
	// ErrDimMismatch is returned when an inserted or queried vector's
	// length does not match the bucket's manifest dimension.
	ErrDimMismatch = errors.New("bw: vector dimension mismatch")
	// ErrVectorEmpty is returned when an empty vector is passed where
	// a non-empty one is required.
	ErrVectorEmpty = errors.New("bw: vector is empty")
)

Sentinel errors returned by bw.

View Source
var (
	// DefaultCacheSize is the BlockCacheSize applied by default. Badger's
	// own default is 256 MiB.
	DefaultCacheSize int64 = 100 << 20 // 100 MiB
	// DefaultLogSize is the ValueLogFileSize applied by default. Badger's
	// own default is 1 GiB.
	DefaultLogSize int64 = 100 << 20 // 100 MiB
)

Sensible defaults applied to the underlying badger.Options when the user does not override them via WithBadgerOptions. Badger's stock defaults are tuned for very large datasets (256 MiB block cache, 1 GiB value log file size, 64 MiB memtables) and feel wasteful for the small/medium embedded use-cases bw targets, so we shrink them here.

These are package-level vars on purpose: callers may tune them once at process start without going through WithBadgerOptions.

Functions

This section is empty.

Types

type Bucket

type Bucket[T any] struct {
	// contains filtered or unexported fields
}

Bucket is a typed view of a Badger key-prefix that stores records of type T. Use RegisterBucket to construct one.

func MigrateBucket

func MigrateBucket[T any](db *DB, name string, opts ...BucketOption[T]) (*Bucket[T], error)

MigrateBucket re-registers a bucket whose struct definition has changed (e.g. new fields were added, indexes added/removed). Unlike a full rebuild, it performs an incremental migration:

  • Fields whose index/unique flags are unchanged are left alone.
  • Only newly indexed/unique fields have their entries built.
  • Only removed indexes/unique constraints have their entries deleted.

This is safe for additive changes (new fields default to their zero value in old records) and for removing/renaming indexed fields. It is NOT safe if you change the primary key field or rename the `bw` tag of the pk field — that requires a manual data migration.

MigrateBucket returns the ready-to-use Bucket, just like RegisterBucket.

func RegisterBucket

func RegisterBucket[T any](db *DB, name string, opts ...BucketOption[T]) (*Bucket[T], error)

RegisterBucket parses T's schema and returns a typed Bucket bound to db.

The bucket name is used as the key prefix and must not contain NUL bytes. The schema is cached per type, so repeated calls are cheap.

T may be any struct type. The default codec (msgpack via shamaton) is reflect-based, so no codegen step is required: declare your bucket type and call RegisterBucket.

func (*Bucket[T]) Count

func (b *Bucket[T]) Count(ctx context.Context, q *query.Query) (uint64, error)

Count scans the bucket and returns the number of records matching q. Offset and Limit from q are ignored; Sort is ignored.

func (*Bucket[T]) CountTx added in v0.1.3

func (b *Bucket[T]) CountTx(tx *Tx, q *query.Query) (uint64, error)

CountTx is like Count but operates on a caller-controlled transaction.

func (*Bucket[T]) Delete

func (b *Bucket[T]) Delete(ctx context.Context, key any) error

Delete removes the record with the given key, plus every index/unique entry attached to it. The call is idempotent: deleting a missing key returns nil.

func (*Bucket[T]) DeleteTx

func (b *Bucket[T]) DeleteTx(tx *Tx, key any) error

DeleteTx removes the record with the given key within a caller-controlled transaction. Idempotent: deleting a missing key returns nil.

func (*Bucket[T]) Find

func (b *Bucket[T]) Find(ctx context.Context, q *query.Query) ([]*T, error)

Find scans the bucket and returns every record matching q. Sort, Offset and Limit from q are honoured. q may be nil (returns every record).

When q's top-level Where contains an equality, IN-list, or range comparison on an `index`-tagged field, Find executes via an index seek and falls back to a full scan only for the residual filter.

Sort is performed on the typed *T slice rather than at the engine layer, so the codec only decodes each value once.

func (*Bucket[T]) FindAndUpdate

func (b *Bucket[T]) FindAndUpdate(ctx context.Context, q *query.Query, fn func(*T) (*T, error)) error

FindAndUpdate finds all records matching q, calls fn on each one, and writes back the results — all inside a single read-write transaction.

fn receives each matched record and returns either:

  • a (possibly modified) *T to write back, or
  • nil to skip that record (no write).

If fn returns an error, the entire transaction is rolled back. Sort, Offset, and Limit from q are honoured before fn is called.

func (*Bucket[T]) FindTx added in v0.1.3

func (b *Bucket[T]) FindTx(tx *Tx, q *query.Query) ([]*T, error)

FindTx is like Find but operates on a caller-controlled transaction. Sort/Offset/Limit/Select from q are honoured. Use this when the caller is mid-transaction and needs to observe pending writes.

func (*Bucket[T]) Get

func (b *Bucket[T]) Get(ctx context.Context, key any) (*T, error)

Get retrieves the record stored under key. Returns ErrNotFound if there is no such key.

key may be string, []byte, an integer, or any value matching the type of the pk field; non-string types are encoded the same way as during Insert.

func (*Bucket[T]) GetTx added in v0.1.3

func (b *Bucket[T]) GetTx(tx *Tx, key any) (*T, error)

GetTx is like Get but operates on a caller-controlled transaction. Use it when a downstream Insert/Update/Delete needs to observe writes that have not yet been committed: the bucket's standalone Get opens its own read-only view, and Badger's MVCC isolation hides in-flight writes from that view.

func (*Bucket[T]) Insert

func (b *Bucket[T]) Insert(ctx context.Context, record *T) error

Insert encodes record and writes it under its primary key, maintaining every index/unique entry along the way. If the key already exists the record is overwritten and stale index entries are removed.

If the schema declares a `unique` field whose value is already owned by another pk, ErrConflict is returned and the transaction is rolled back.

func (*Bucket[T]) InsertMany

func (b *Bucket[T]) InsertMany(ctx context.Context, records []*T) error

InsertMany inserts all records in a single read-write transaction. If any record fails (e.g. a unique constraint violation), the entire batch is rolled back and the error is returned.

func (*Bucket[T]) InsertNew

func (b *Bucket[T]) InsertNew(ctx context.Context, record *T) error

InsertNew is like Insert but returns ErrConflict if the primary key already exists. Use it when you want strict create-only semantics.

func (*Bucket[T]) InsertNewTx

func (b *Bucket[T]) InsertNewTx(tx *Tx, record *T) error

InsertNewTx is like InsertNew but operates on a caller-controlled transaction. Returns ErrConflict if the primary key already exists.

func (*Bucket[T]) InsertTx

func (b *Bucket[T]) InsertTx(tx *Tx, record *T) error

InsertTx is like Insert but operates on a caller-controlled transaction. Use it to batch multiple writes into a single atomic commit.

func (*Bucket[T]) Name

func (b *Bucket[T]) Name() string

Name returns the bucket's key prefix name.

func (*Bucket[T]) Search added in v0.1.2

func (b *Bucket[T]) Search(ctx context.Context, query string, limit, offset int) ([]SearchResult[T], uint64, error)

Search performs a full-text search over the bucket's FTS-tagged fields using Bleve's query string syntax. It returns matched records hydrated from BadgerDB, ordered by relevance score (highest first).

Returns ErrNoFTS if the bucket has no FTS-tagged fields. limit and offset control pagination (0 limit means default 10).

func (*Bucket[T]) SearchVector added in v0.2.0

func (b *Bucket[T]) SearchVector(ctx context.Context, q []float32, opts ...SearchVectorOptions) ([]VectorHit[T], error)

SearchVector returns the top-K records whose vector field is closest to q under the configured distance metric. The bucket must declare a `vector`-tagged field at registration time; otherwise ErrNoVector is returned. Stage A implementation is brute-force over all non-deleted vectors; recall is exact.

opts is variadic for ergonomics: SearchVector(ctx, q) takes the schema defaults (K=10, schema metric, no filter). Pass at most one SearchVectorOptions to override.

When opts.Filter is non-nil it must be a *query.Query; the bucket resolves it via the ordinary Find path (so any indexable predicate the planner understands is fast) and the vector pass only sees pks that survived the filter.

func (*Bucket[T]) Update

func (b *Bucket[T]) Update(ctx context.Context, record *T) error

Update is an alias for Insert. Both perform a read-old-then-write-new so the indexes stay consistent.

func (*Bucket[T]) UpdateTx

func (b *Bucket[T]) UpdateTx(tx *Tx, record *T) error

UpdateTx is like Update but operates on a caller-controlled transaction.

func (*Bucket[T]) Walk

func (b *Bucket[T]) Walk(ctx context.Context, q *query.Query, fn func(*T) error) error

Walk streams matching records to fn. Sort in q is ignored; Offset and Limit are honoured.

func (*Bucket[T]) WalkTx added in v0.1.3

func (b *Bucket[T]) WalkTx(tx *Tx, q *query.Query, fn func(*T) error) error

WalkTx is like Walk but operates on a caller-controlled transaction.

type BucketOption

type BucketOption[T any] func(*Bucket[T])

BucketOption configures a Bucket at registration time.

func WithEmbedder added in v0.2.0

func WithEmbedder[T any](fn func(ctx context.Context, record *T) ([]float32, error)) BucketOption[T]

WithEmbedder installs an embedding function that bw calls during Insert when the record's `vector`-tagged field is empty. Returning a non-nil error from the embedder fails the Insert; the data write is rolled back along with any other index maintenance, so partial state is impossible.

Records that already carry a populated vector skip the embedder, so callers can mix "auto-embed by text" and "I already computed the vector" calls in the same bucket.

func WithKeyFn

func WithKeyFn[T any](fn func(*T) ([]byte, error)) BucketOption[T]

WithKeyFn supplies a custom primary-key extractor. Use this when the record type has no `pk`-tagged field, or when the natural key is composed from multiple fields.

func WithMigrationProgress added in v0.2.0

func WithMigrationProgress[T any](fn MigrationProgress) BucketOption[T]

WithMigrationProgress installs a progress hook. Called once per batch and once at completion of each step.

func WithRawMigration added in v0.2.0

func WithRawMigration[T any](fromV, toV uint64, fn RawMigrationFn) BucketOption[T]

WithRawMigration registers a byte-level migration step from fromVersion to toVersion. fn is invoked once per record in the bucket; the returned bytes replace the record in place.

The migration runs inside RegisterBucket when stored bucket version equals fromVersion. Multiple migrations can be chained by calling WithRawMigration repeatedly with consecutive (fromV, toV) pairs.

func WithTypedMigration added in v0.2.0

func WithTypedMigration[Old, New any](fromV, toV uint64, fn func(ctx context.Context, old *Old) (*New, error)) BucketOption[New]

WithTypedMigration registers a typed migration step. Old is the previous record shape; the supplied function reads it and returns the new record (which must be encodable by the bucket's codec).

Old does not need to live in any specific package — define it alongside the migration with whatever bw tags it had at the time of the previous schema version. The runner unmarshals raw bytes into *Old, calls fn, and re-marshals the *T it returns.

Use this when both shapes are full Go structs you maintain. For quick map-style edits without defining the old struct, use WithRawMigration with codec.UnmarshalMap.

func WithVectorParams added in v0.2.0

func WithVectorParams[T any](M, efConstruction int) BucketOption[T]

WithVectorParams overrides the HNSW knobs for the bucket's vector field. M is the graph degree (neighbours kept per node, default 16); efConstruction is the candidate-set size used during inserts (default 200). Pass zero to leave the corresponding default in place.

Once a vector has been inserted, the values are persisted to the manifest and become immutable — changing them later orphans existing neighbour lists. Pick once, ideally before the first insert.

func WithVectorReembed added in v0.2.0

func WithVectorReembed[T any](fromV, toV uint64, embedder func(ctx context.Context, record *T) ([]float32, error)) BucketOption[T]

WithVectorReembed registers a migration step that only recomputes the bucket's vector field. Useful when switching embedding models (the data shape is unchanged, only the embedding's dim or its numeric values shift).

The bucket must declare exactly one `vector` field; the embedder is invoked per record and its output replaces the existing Embed slice. Other fields pass through unchanged.

func WithVersion

func WithVersion[T any](v uint64) BucketOption[T]

WithVersion sets the schema version for this bucket. When the stored version is lower than the provided one, RegisterBucket automatically performs an incremental migration (only rebuilding indexes for changed fields) instead of failing with a fingerprint mismatch.

Bump this number each time you change the struct's index/unique surface.

type DB

type DB struct {
	// contains filtered or unexported fields
}

DB is a bw database handle. It wraps a *badger.DB plus the codec and any registered bucket schemas.

func Open

func Open(path string, opts ...Option) (*DB, error)

Open opens (or creates) a bw database at the given filesystem path.

If options include WithBadgerOptions, the embedded badger options take precedence and path is ignored. WithInMemory(true) overrides path and the badger options' Dir/ValueDir.

func (*DB) Backup

func (db *DB) Backup(w io.Writer, since uint64, deletedData bool) (uint64, error)

Backup creates a backup of all entries with version > since.

When deletedData is false, it uses Badger's built-in Backup which skips delete markers. When true, it walks all versions (including deletes) so the backup can later be used with BackupUntil for point-in-time restore.

func (*DB) BackupUntil

func (db *DB) BackupUntil(w io.Writer, until uint64) (uint64, error)

BackupUntil creates a backup containing only entries with version <= until. It iterates through all versions of each key (including past delete markers) so that older live versions are included when their version falls within range.

func (*DB) Badger

func (db *DB) Badger() *badger.DB

Badger returns the underlying *badger.DB. Use sparingly: writes that bypass bw's bucket abstraction will not maintain indexes.

func (*DB) Begin

func (db *DB) Begin() *Tx

Begin starts a read-write transaction. The caller is responsible for calling Commit or Discard.

func (*DB) BeginRead

func (db *DB) BeginRead() *Tx

BeginRead starts a read-only transaction.

func (*DB) Close

func (db *DB) Close() error

Close closes the underlying Badger database. The full-text-search and vector indexes live entirely as Badger keys, so there is nothing else to release.

func (*DB) Codec

func (db *DB) Codec() codec.Codec

Codec returns the codec the DB was opened with.

func (*DB) Restore

func (db *DB) Restore(r io.Reader) error

Restore loads a backup from the given reader, replacing the current database contents. The reader should contain data previously written by Backup or BackupUntil.

Full-text search state is stored as ordinary Badger keys under the \x00fts\x00 namespace (see keys.go), so the backup stream carries FTS postings, doc lengths, and corpus stats alongside data. Restore is therefore portable to any directory: the receiving DB sees the same view of every Bucket[T].Search the source produced. No rebuild step is required.

func (*DB) Update

func (db *DB) Update(fn func(tx *Tx) error) error

Update runs fn inside a read-write transaction. If fn returns nil, the transaction is committed; otherwise it is discarded.

func (*DB) Version

func (db *DB) Version() uint64

Version returns the maximum committed transaction version in the database. This is useful for incremental backups (pass the returned value as the "since" parameter to a subsequent Backup call).

func (*DB) View

func (db *DB) View(fn func(tx *Tx) error) error

View runs fn inside a read-only transaction. The transaction is automatically discarded after fn returns.

func (*DB) WaitForFTSSync added in v0.2.0

func (db *DB) WaitForFTSSync(_ context.Context) error

WaitForFTSSync is retained for API stability with the prior outbox-based implementation. Writes are now synchronous (committed inside the same Badger txn as the data), so this method always returns immediately.

func (*DB) Wipe added in v0.1.3

func (db *DB) Wipe() error

Wipe drops every key from the database — data, indexes, unique reservations, and bucket schema metadata — and resets every registered FTS index. It is destructive and irreversible. Use it when you want to follow up with Restore for a clean swap.

In-process bucket handles returned from RegisterBucket remain valid after Wipe: their schema, codec and FTS pointers are mutated in place. The next write into a bucket re-establishes its meta keys transparently.

Caveats inherited from Badger:

  • DropAll blocks all writes for the duration of the wipe.
  • DropAll is NOT safe to run concurrently with reads. Quiesce all RPCs that touch the DB before calling Wipe.

type DefaultTokenizer added in v0.2.0

type DefaultTokenizer struct {
	// MinLen is the inclusive minimum rune count for a token to be
	// retained. Zero or negative means 1 (single-character tokens
	// allowed).
	MinLen int
}

DefaultTokenizer splits on Unicode non-letter/non-digit boundaries, lowercases each run, and skips runs shorter than MinLen. It works reasonably for Latin and most BMP scripts; for serious Turkish stemming, plug in a domain-specific tokenizer via WithFTSTokenizer.

func (DefaultTokenizer) Tokenize added in v0.2.0

func (d DefaultTokenizer) Tokenize(text string) []string

Tokenize splits text into normalised terms.

type MigrationProgress added in v0.2.0

type MigrationProgress func(bucket string, fromV, toV uint64, processed, total uint64)

MigrationProgress is the callback invoked between batches by the migration runner. Processed counts records migrated so far in the current step; total is the bucket-wide record count captured before the step began (so it stays stable across the call).

type Option

type Option func(*options)

Option configures a *DB at Open time.

func WithBadgerOptions

func WithBadgerOptions(bo badger.Options) Option

WithBadgerOptions overrides the underlying badger.Options entirely.

When provided, bw does NOT apply DefaultCacheSize / DefaultLogSize: the caller is assumed to know what they want. The path argument to Open is also ignored in favour of bo.Dir / bo.ValueDir.

func WithCodec

func WithCodec(c codec.Codec) Option

WithCodec sets the codec used to serialize record values. The default is msgpack (codec.MsgPack).

func WithInMemory

func WithInMemory(b bool) Option

WithInMemory configures Badger to run entirely in memory. Convenient for tests; the path argument to Open is ignored.

func WithLogger

func WithLogger(l badger.Logger) Option

WithLogger sets the badger.Logger. Pass nil to silence Badger entirely.

type RawMigrationFn added in v0.2.0

type RawMigrationFn func(ctx context.Context, raw []byte) ([]byte, error)

RawMigrationFn transforms the encoded bytes of one record. It must be deterministic and idempotent (the runner may legitimately replay the same record on resume).

type SearchHit added in v0.1.2

type SearchHit struct {
	ID    string
	Score float64
}

SearchHit is one result returned by ftsIndex.search. ID is the raw (string-coerced) primary key bytes; Score is the BM25 sum across query terms.

type SearchResult added in v0.1.2

type SearchResult[T any] struct {
	Record *T
	Score  float64
}

SearchResult holds a matched record along with its relevance score.

type SearchVectorOptions added in v0.2.0

type SearchVectorOptions struct {
	// K is the number of hits to return. Zero means 10.
	K int
	// EfSearch tunes HNSW recall vs latency: bigger = higher
	// recall, slower. Zero defaults to 100 (or to K, whichever is
	// larger). Has no effect on the brute-force fallback path.
	EfSearch int
	// Metric overrides the field's default metric. Zero (MetricDefault)
	// keeps the schema setting.
	Metric VectorMetric
	// Filter, when non-nil, restricts the candidate set. The bucket
	// resolves the query into matching pks via the standard Find
	// machinery before the vector pass runs. Type is *any so the
	// vector module doesn't import query directly; bucket.go casts.
	Filter any
}

SearchVectorOptions tunes a single SearchVector call.

Filter is wired in by Bucket.SearchVector — it pre-resolves the query into a pk allow-set and hands the set to the vector index, so vectorIndex itself stays free of any query-engine dependency.

type Tokenizer added in v0.2.0

type Tokenizer interface {
	Tokenize(text string) []string
}

Tokenizer turns a raw field value into the lower-cased, normalised terms that participate in the inverted index. Implementations must be deterministic — search uses the same Tokenizer to break the query string, so any inconsistency makes documents unreachable.

type Tx

type Tx struct {
	// contains filtered or unexported fields
}

Tx is a transaction handle wrapping a *badger.Txn.

Use db.View(fn) for a read-only transaction or db.Update(fn) for a read-write transaction. db.Begin/BeginRead are also available for users who need finer control over the lifecycle.

func (*Tx) Badger

func (tx *Tx) Badger() *badger.Txn

Badger returns the underlying *badger.Txn for advanced use.

func (*Tx) Commit

func (tx *Tx) Commit() error

Commit commits the transaction. Calling Commit on a read-only transaction is a no-op apart from discarding it.

func (*Tx) DB

func (tx *Tx) DB() *DB

DB returns the parent DB.

func (*Tx) Discard

func (tx *Tx) Discard()

Discard releases the transaction without committing.

func (*Tx) ReadOnly

func (tx *Tx) ReadOnly() bool

ReadOnly reports whether the transaction is read-only.

type VectorHit added in v0.2.0

type VectorHit[T any] struct {
	Record *T
	Score  float64
}

VectorHit is one result returned by Bucket.SearchVector.

type VectorMetric added in v0.2.0

type VectorMetric uint8

VectorMetric selects the distance function used at search time. Higher Score in a SearchVectorHit always means "more similar": cosine and dot are returned as-is; L2 is negated so the heap invariant matches the other metrics.

const (
	// MetricDefault tells SearchVector to use the metric configured
	// in the field's schema tag (or Cosine if none was set).
	MetricDefault VectorMetric = 0
	// Cosine similarity, expects normalised vectors for best
	// numerical behaviour.
	Cosine VectorMetric = 1
	// DotProduct similarity. Equivalent to Cosine when both vectors
	// are unit-normalised; faster otherwise (no magnitudes).
	DotProduct VectorMetric = 2
	// Euclidean (L2) distance; returned as -|a-b| so the highest
	// score is still the most similar.
	Euclidean VectorMetric = 3
)

func (VectorMetric) String added in v0.2.0

func (m VectorMetric) String() string

Directories

Path Synopsis
Package cluster provides distributed synchronization for bw databases using the alan UDP/QUIC peer discovery library.
Package cluster provides distributed synchronization for bw databases using the alan UDP/QUIC peer discovery library.
Package codec defines the Codec interface used by bw to serialize and deserialize records stored in BadgerDB, plus the default implementations.
Package codec defines the Codec interface used by bw to serialize and deserialize records stored in BadgerDB, plus the default implementations.
Package engine implements the query → BadgerDB execution pipeline: dot-path field lookup on decoded records, evaluation of query.Expression against those records, and a full-scan executor.
Package engine implements the query → BadgerDB execution pipeline: dot-path field lookup on decoded records, evaluation of query.Expression against those records, and a full-scan executor.
example
cluster command
markdown command
Markdown FTS demo.
Markdown FTS demo.
migration command
User migration demo.
User migration demo.
vector command
Vector + filter + embedder demo.
Vector + filter + embedder demo.
internal
keyenc
Package keyenc encodes Go values into byte slices whose lexicographic order matches the natural order of the underlying value.
Package keyenc encodes Go values into byte slices whose lexicographic order matches the natural order of the underlying value.
Package schema parses `bw:"..."` struct tags into a cached schema describing the primary key, secondary indexes and unique constraints of a record type.
Package schema parses `bw:"..."` struct tags into a cached schema describing the primary key, secondary indexes and unique constraints of a record type.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL