bloomsearch

package module
v0.2.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Feb 15, 2026 License: AGPL-3.0 Imports: 21 Imported by: 0

README

BloomSearch

Keyword search engine with hierarchical bloom filters for massive datasets

BloomSearch provides extremely low memory usage and low cold-start searches through pluggable storage interfaces.

  • Memory efficient: Bloom filters have constant size regardless of data volume
  • Pluggable storage: DataStore and MetaStore interfaces for any backend (can be same or separate)
  • Fast filtering: Hierarchical pruning via partitions, minmax indexes, and bloom filters
  • Flexible queries: Search by field, token, or field:token with AND/OR combinators
  • Disaggregated storage and compute: Unbound ingest and query throughput

Perfect for logs, JSON documents, and high-cardinality keyword search.

Quick start

go get github.com/danthegoodman1/bloomsearch
// Initialize stores
dataStore := NewFileSystemDataStore("./data")
metaStore := dataStore // FileSystemDataStore also implements MetaStore

// Create engine with default config
engine := NewBloomSearchEngine(DefaultBloomSearchEngineConfig(), metaStore, dataStore)
engine.Start()

// Insert data asynchronously (no wait for flush)
engine.IngestRows(ctx, []map[string]any{{
    "level": "error",
    "message": "database connection failed",
    "service": "auth",
}}, nil)

// Provide a `chan error` to wait for flush
doneChan := make(chan error)
engine.IngestRows(ctx, []map[string]any{{
    "level": "info",
    "message": "login successful",
    "service": "auth",
}}, doneChan)
if err := <-doneChan; err != nil {
    log.Fatal(err)
}

// Collect the resulting rows that match
resultChan := make(chan map[string]any, 100)
// If any of the workers error, they report it here
errorChan := make(chan error, 10)

err := engine.Query(
    ctx,
    // Query for rows where `.level: "error"`
    NewQuery().Field("level").Token("error").Build(),
    resultChan,
    errorChan,
)
if err != nil {
    log.Fatal(err)
}

// Process results
for {
    select {
    case <-ctx.Done():
        return
    case row, activeWorkers := <-resultChan:
        if !activeWorkers {
            return
        }
        // Process matching row
        fmt.Printf("Found row: %+v\n", row)
    case err := <-errorChan:
        log.Printf("Query error: %v", err)
        // Continue processing other results, or cancel context
    }
}

See tests for complete working examples, including partitioning and minmax index filtering.

Concepts

Bloom filters

Bloom filters are a probabilistic data structure for testing set membership. They guarantee no false negatives but allow tunable false positives. Constant size regardless of data volume with extremely fast lookups and minimal memory usage.

Search types

BloomSearch supports three types of searches against JSON documents:

Given example log records:

{"level": "error", "service": "auth", "message": "login failed", "user_id": 123}
{"level": "info", "service": "payment", "message": "payment processed", "amount": 50.00}
{"level": "error", "service": "payment", "message": "database timeout", "retry_count": 3}

Field search - Find records containing a specific field path:

// Find all records with "retry_count" field
query := NewQuery().Field("retry_count").Build()

Token search - Find records containing a value anywhere:

// Find all records containing "error" in any field
query := NewQuery().Token("error").Build()

Field:token search - Find records with a specific value in a specific field:

// Find all records where `.service: "payment"`
query := NewQuery().FieldToken("service", "payment").Build()

Complex combinations:

// (field AND token) OR fieldtoken
query := NewQuery().
    Match(
        Or(
            // And(Field, Token) means "field exists" AND "token exists anywhere" (can be different fields).
            And(
                Field("retry_count"),
                Token("error"),
            ),
            // FieldToken means the token must be present in this specific field.
            FieldToken("service", "payment"),
        ),
    ).
    Build()

// (service OR level) AND token:error
query := NewQuery().
    Match(
        And(
            Or(
                Field("service"),
                Field("level"),
            ),
            Token("error"),
        ),
    ).
    Build()

Match(...) takes a boolean expression tree built from And(...) and Or(...). Simple chained calls like .Field(...).Token(...) still default to implicit AND.

Queries can be combined with AND/OR operators and filtered by partitions and minmax indexes.

Data files

Data files are designed for single-pass writing with row groups, similar to Parquet. They include minmax filters for quick pruning and support partitions like ClickHouse.

Files are self-contained and immutable. Bloom filter storage overhead is amortized as row groups grow while filters remain constant size.

See FILE_FORMAT.md for details.

Partitions

Partitions enable eager pruning before bloom filter tests. Each data block belongs to one partition:

                 File Metadata
                      │
        ┌─────────────┼─────────────┐
        │             │             │
     [202301]      [202302]     [202303]
     Jan 2023      Feb 2023     Mar 2023
       logs          logs         logs

They can be specified with a PartitionFunc:

// Partition by year-month from timestamp
func TimePartition(row map[string]any) string {
    if ts, ok := row["timestamp"].(int64); ok {
        return time.Unix(ts/1000, 0).Format("200601") // YYYYMM
    }
    return ""
}

config.PartitionFunc = TimePartition

Partitions are optional. When querying with partition conditions, files without partition IDs are always included to avoid missing data.

MinMax Indexes

Track minimum and maximum values for numeric fields, enabling range-based pruning:

config.MinMaxIndexes = []string{"timestamp", "response_time"}

// Query with range filter and bloom conditions
query := NewQuery().
    MatchPrefilter(
        PrefilterAnd(
            MinMax("timestamp", NumericBetween(start, end)),
            MinMax("response_time", NumericLessThan(1000)),
        ),
    ).
    FieldToken("level", "error").
    Build()

Use MatchPrefilter(...) with PrefilterAnd(...) / PrefilterOr(...) for prefilter logic.

MinMax indexes are optional. When querying with range conditions, files without minmax indexes are always included to avoid missing data.

Merging

Merging files reduces metadata operations (file opens, bloom filter tests) and improves query performance.

Bloom filters of the same size can be trivially merged by OR-ing their bits. If bloom filter parameters change, the system rebuilds filters from raw data during merge.

Two files are considered mergeable if they have the same file-level bloom filter parameters, and combined they are still under the max file size threshold.

Once two files have been decided to merge, the algorithm then considers whether row groups within the files can be merged. They are considered mergeable if they share the same partition ID, have the same bloom filter parameters and combined are under the max row group size parameters (number of rows is ignored here since this matters less than when memory-buffering).

Then, all merging is done via streaming to keep memory usage low.

First, the new file is created in the DataStore. Then, row groups are merged together by decompressing and rewriting them (this means different compression settings are supported and consolidated) and bloom filters merged. Row groups that are not merged are simply copied in as-is without decompressing. Row group metadata is merged (minmax indexes, number of rows, etc.) and added to the running file metadata. The final file metadata is created by merging all file-level bloom filters and writing it out to the new file. Finally, the MetaStore receives an update to atomically create the new file, and delete all the old files.

Coordinated Merges (issue)

Multiple concurrent writers need coordination to avoid conflicts. A CoordinatedMetaStore can expose lease methods, enabling multiple writers and background merge processes to work together safely.

TTLs

TTL uses the same merging mechanism to drop expired data. Configure TTL conditions based on partition ID, minmax indexes, or row group age. Expired row groups and files are dropped during merge.

TTLs are optional.

DataStore

Pluggable interface for file storage with two methods:

type DataStore interface {
    CreateFile(ctx context.Context) (io.WriteCloser, []byte, error)
    OpenFile(ctx context.Context, filePointerBytes []byte) (io.ReadSeekCloser, error)
}

The filePointerBytes abstracts storage location (file path, S3 bucket/key, etc.) and is stored in the MetaStore for later retrieval. Enables storage backends like filesystem, S3, GCS, etc.

MetaStore

Handles file metadata storage and query pre-filtering:

type MetaStore interface {
    GetMaybeFilesForQuery(ctx context.Context, query *QueryPrefilter) ([]MaybeFile, error)
    Update(ctx context.Context, writes []WriteOperation, deletes []DeleteOperation) error
}

Can be the same as DataStore (e.g., FileSystemDataStore) or separate for performance.

Advanced implementations using databases can pre-filter partition IDs and minmax indexes, reducing bloom filter tests.

Write path
┌─────────────┐    ┌─────────────────┐    ┌──────────────┐
│1. Ingest    │ ──►│2. Buffer        │ ──►│3. Flush      │
│   Rows      │    │   • Partitions  │    │   • Create   │
│             │    │   • Bloom       │    │     file     │
│             │    │   • MinMax      │    │   • Stream   │
└─────────────┘    └─────────────────┘    │     blocks   │
                                          └──────┬───────┘
                                                 │
                                          ┌──────▼───────┐
                                          │4. Finalize   │
                                          │   • Metadata │
                                          │   • Update   │
                                          │     stores   │
                                          └──────────────┘

Configurable flush triggers: row count, byte size, or time-based.

Buffering is done in a single thread to remove lock content, and at flush time spawns off a dedicated goroutine for writing the buffers. This means that flushing has no impact on ingestion performance.

Query path

Query flow for field, token, or field:token combinations:

┌─────────────┐    ┌─────────────────┐    ┌──────────────┐
│1. Build     │ ──►│2. Pre-filter    │ ──►│3. Bloom Test │
│   Query     │    │   (MetaStore)   │    │ (file-level) │
│             │    │                 │    │              │
└─────────────┘    └─────────────────┘    └──────┬───────┘
                                                 │
                                                 ▼
┌─────────────┐    ┌─────────────────┐    ┌──────────────┐
│6. Row       │ ◄──│5. Bloom Test    │ ◄──│4. Stream     │
│   Scan      │    │   (block-level) │    │   Blocks     │
│             │    │                 │    │              │
└─────────────┘    └─────────────────┘    └──────────────┘
// Example query combining prefiltering with bloom search
query := NewQuery().
    MatchPrefilter(
        PrefilterOr(
            Partition(PartitionEquals("202301")),
            MinMax("timestamp", NumericBetween(start, end)),
        ),
    ).
    Field("user_id").Token("error").
    Build()

maybeFiles, err := metaStore.GetMaybeFilesForQuery(ctx, query.Prefilter)

Query processing is done highly-concurrently: A goroutine is spawned for every file (if the result is over 20 files), and for every row group. This allows it to maximimze multi-core machines.

Memory usage scales with concurrent file reads, not dataset size.

This flow is a bit simplified, see BloomSearchEngine.Query for more detail.

As you notice, BloomSearchEngine.Query takes in a resultChan and errorChan. This is because each row group processor reads the row group one row at a time, allowing to stream matches back to the caller.

This enables processing of arbitrarily large results as well.

When the resultChan closes, there are no more active row group processors, and the caller can exit.

Distributed Query Processing (issue)

Query processing naturally decomposes into independent row group tasks that can be distributed across multiple nodes. Since results are streamed back asynchronously without ordering guarantees, this creates a perfectly parallelizable workload.

Distributed query processing extends the existing path like this:

┌──────────┐     ┌──────────────┐     ┌───────────┐     ┌─────────────┐     ┌─────────────┐
│1. Build  │ ──► │2. Pre-filter │ ──► │3. Scatter │ ──► │4. Peers     │ ──► │5. Stream    │
│   Query  │     │   MetaStore  │     │   Work to │ ──► │   Process   │ ──► │   Results   │
│          │     │              │     │    Peers  │ ──► │  Row Groups │ ──► │   Back to   │
└──────────┘     └──────────────┘     └───────────┘     └─────────────┘     │ Coordinator │
                                                                            └─────────────┘
  1. Build Query - Coordinator constructs the query with bloom conditions and prefilters
  2. Pre-filter MetaStore - Coordinator identifies candidate files using partition and MinMax indexes where possible
  3. Scatter Work to Peers - Coordinator distributes row group processing tasks across available peers
  4. Peers Process Row Groups - Each peer performs bloom filter tests and row scanning independently
  5. Stream Results Back to Coordinator - Peers stream matching rows directly to the coordinator via unique query IDs

Peer discovery uses gossip protocol for fault tolerance, while work assignment prioritizes peers with available capacity. Each peer maintains its own connection to the coordinator for result streaming, enabling horizontal scaling without central bottlenecks.

Performance

See PERFORMANCE.md

Contributing

Do not submit random PRs, they will be closed.

For feature requests and bugs, create an Issue.

For questions, create a Discussion.

AI Code

More as a disclaimer, this codebase was heavily contributed by Claude 4 Sonnet using Cursor.

I normally use Goland (which I miss many features from that fill massive gaps in the go linter/compiler, like telling me what's needed to implement an interface).

All code has been carefully reviewed, and tests have been written, to ensure validity and that it is of the quality that I would write myself.

The common pattern I used is:

  1. Define a clear spec (with a todo list)
  2. Have it build that
  3. Have it write tests to my spec that check edge cases and verify robustness
  4. Have it simplify the code and find consolidation and code reusability opportunities - an example can be seen in this commit

Documentation

Index

Constants

View Source
const (
	FileVersion = uint32(1)
	MagicBytes  = "BLOMSRCH"

	LengthPrefixSize  = 4
	VersionPrefixSize = 4
	HashSize          = 4
)

File format constants

Variables

View Source
var (
	ErrInvalidConfig = errors.New("invalid configuration")
)
View Source
var (
	ErrInvalidHash = errors.New("invalid hash")
)

Functions

func BasicWhitespaceLowerTokenizer

func BasicWhitespaceLowerTokenizer(value any) []string

BasicWhitespaceLowerTokenizer tokenizes values by splitting on whitespace and converting to lowercase

func ConvertToInt64

func ConvertToInt64(value any) (int64, bool)

ConvertToInt64 converts any numeric value to int64. For floats, it rounds to the nearest integer. Returns false if the value is not a numeric type.

func ConvertToMinMaxInt64

func ConvertToMinMaxInt64(value any) (minVal int64, maxVal int64, ok bool)

ConvertToMinMaxInt64 converts any numeric value to int64 min/max values. For integers, min and max are the same. For floats, min uses Floor and max uses Ceil. Returns false if the value is not a numeric type.

func EvaluateDataBlockMetadata

func EvaluateDataBlockMetadata(metadata *DataBlockMetadata, query *QueryPrefilter) bool

EvaluateDataBlockMetadata checks if a DataBlockMetadata matches the prefilter expression.

func EvaluateMinMaxCondition

func EvaluateMinMaxCondition(minMaxIndex MinMaxIndex, condition NumericCondition) bool

EvaluateMinMaxCondition checks if a MinMaxIndex overlaps with the given condition This is used for range-based filtering where we want to include data blocks that might contain matching values

func EvaluateNumericCondition

func EvaluateNumericCondition(value int64, condition NumericCondition) bool

EvaluateNumericCondition checks if a numeric value matches the given condition

func EvaluateStringCondition

func EvaluateStringCondition(value string, condition StringCondition) bool

EvaluateStringCondition checks if a string value matches the given condition

func FormatBytesPerSecond

func FormatBytesPerSecond(bytes int64, duration time.Duration) string

FormatBytesPerSecond formats bytes per second into human-readable format (B/s, KB/s, MB/s, GB/s, TB/s)

func FormatRate

func FormatRate(count int64, duration time.Duration) string

FormatRate formats rate per second

func SendWithContext

func SendWithContext[T any](ctx context.Context, ch chan<- T, value T) error

SendWithContext attempts to send a value to a channel while respecting context cancellation. Returns an error if the context is done before the send completes.

func TestGJSONForField

func TestGJSONForField(value gjson.Result, fieldPath, delimiter string) bool

Byte-based helpers that operate on a pre-parsed gjson.Result

func TestGJSONForFieldToken

func TestGJSONForFieldToken(value gjson.Result, fieldPath, delimiter, token string, tokenizer ValueTokenizerFunc) bool

func TestGJSONForToken

func TestGJSONForToken(value gjson.Result, token string, tokenizer ValueTokenizerFunc) bool

func TestJSONForBloomCondition

func TestJSONForBloomCondition(jsonBytes []byte, condition *BloomCondition, delimiter string, tokenizer ValueTokenizerFunc) bool

TestJSONForBloomCondition tests a JSON string against a bloom condition using gjson

func TestJSONForBloomQuery

func TestJSONForBloomQuery(jsonBytes []byte, bloomQuery *BloomQuery, delimiter string, tokenizer ValueTokenizerFunc) bool

TestJSONForBloomQuery tests a JSON string against a bloom query using gjson

func TestJSONForField

func TestJSONForField(jsonStr, fieldPath, delimiter string) bool

TestJSONForField tests if a field path exists in JSON, handling arrays by walking them

func TestJSONForFieldToken

func TestJSONForFieldToken(jsonStr, fieldPath, delimiter, token string, tokenizer ValueTokenizerFunc) bool

TestJSONForFieldToken tests if a specific field contains a specific token, handling arrays by walking them

func TestJSONForToken

func TestJSONForToken(jsonStr, token string, tokenizer ValueTokenizerFunc) bool

TestJSONForToken tests if a token exists in any field value, handling arrays by walking them

func TryWriteChannel

func TryWriteChannel[T any](ch chan<- T, value T) bool

TryWriteChannel attempts to write a value to a channel without blocking. Returns true if the write was successful, false if the channel is full, nobody is listening, or the channel is nil.

func TryWriteToChannels

func TryWriteToChannels[T any](channels []chan T, value T)

TryWriteToChannels sends a value to all channels using TryWriteChannel

Types

type BlockStats

type BlockStats struct {
	FilePointer        []byte
	BlockOffset        int
	RowsProcessed      int64
	BytesProcessed     int64
	TotalRows          int64
	TotalBytes         int64
	Duration           time.Duration
	BloomFilterSkipped bool
}

type BloomCondition

type BloomCondition struct {
	Type  BloomConditionType
	Field string // for FIELD and FIELD_TOKEN
	Token string // for TOKEN and FIELD_TOKEN
}

type BloomConditionType

type BloomConditionType string
const (
	BloomField      BloomConditionType = "FIELD"
	BloomToken      BloomConditionType = "TOKEN"
	BloomFieldToken BloomConditionType = "FIELD_TOKEN"
)

type BloomExpression

type BloomExpression struct {
	// contains filtered or unexported fields
}

func And

func And(expressions ...BloomExpression) BloomExpression

func Field

func Field(field string) BloomExpression

func FieldToken

func FieldToken(field, token string) BloomExpression

func Or

func Or(expressions ...BloomExpression) BloomExpression

func Token

func Token(token string) BloomExpression

type BloomExpressionType

type BloomExpressionType string

type BloomFilters

type BloomFilters struct {
	FieldBloomFilter      *bloom.BloomFilter
	TokenBloomFilter      *bloom.BloomFilter
	FieldTokenBloomFilter *bloom.BloomFilter
}

BloomFilters contains the bloom filters for a data block This struct is serialized and stored at the beginning of each data block

func DataBlockBloomFiltersFromBytesWithHash

func DataBlockBloomFiltersFromBytesWithHash(bytes []byte, expectedHashBytes []byte) (*BloomFilters, error)

func ReadDataBlockBloomFilters

func ReadDataBlockBloomFilters(file io.ReadSeeker, blockMetadata DataBlockMetadata) (*BloomFilters, error)

ReadDataBlockBloomFilters reads bloom filters from a data block given a file reader and block metadata

func (*BloomFilters) Bytes

func (d *BloomFilters) Bytes() ([]byte, []byte)

Returns the data block bloom filters as a byte slice and the CRC32C of the bloom filters

type BloomQuery

type BloomQuery struct {
	Expression *BloomExpression `json:",omitempty"`
}

type BloomSearchEngine

type BloomSearchEngine struct {
	// contains filtered or unexported fields
}

func NewBloomSearchEngine

func NewBloomSearchEngine(config BloomSearchEngineConfig, metaStore MetaStore, dataStore DataStore) (*BloomSearchEngine, error)

func (*BloomSearchEngine) Flush

func (b *BloomSearchEngine) Flush(ctx context.Context) error

Flush forces a flush of any buffered data

func (*BloomSearchEngine) IngestRows

func (b *BloomSearchEngine) IngestRows(ctx context.Context, rows []map[string]any, doneChan chan error) error

IngestRows queues rows for ingestion by the actor

func (*BloomSearchEngine) Merge

func (b *BloomSearchEngine) Merge(ctx context.Context) (*MergeStats, error)

Merge executes file merging to optimize storage and query performance

func (*BloomSearchEngine) Query

func (b *BloomSearchEngine) Query(ctx context.Context, query *Query, resultChan chan<- map[string]any, errorChan chan<- error, statsChan chan<- BlockStats) error

Query executes a query and sends results to the provided channels. The result channel feeds individual matching rows. Canceling the context will stop all workers. When the result channel closes, no more work is happening.

The error channel is written to for any errors that occur per-worker. If a worker writes to this channel, it has stopped processing.

Example usage:

resultChan := make(chan map[string]any, 1000)
errorChan := make(chan error, 100)
err := engine.Query(ctx, query, resultChan, errorChan, nil)
if err != nil { return err }
for {
  select {
  case <-ctx.Done():
    return ctx.Err()
  case row, ok := <-resultChan:
    if !ok { return nil } // done
    // process row
  case err := <-errorChan:
    return err
  }
}

func (*BloomSearchEngine) Start

func (b *BloomSearchEngine) Start()

Start begins the ingestion and flush workers

func (*BloomSearchEngine) Stop

func (b *BloomSearchEngine) Stop(ctx context.Context) error

Stop gracefully shuts down the engine with a timeout

type BloomSearchEngineConfig

type BloomSearchEngineConfig struct {
	Tokenizer     ValueTokenizerFunc
	PartitionFunc PartitionFunc

	MinMaxIndexes []string

	MaxRowGroupBytes int
	MaxRowGroupRows  int
	MaxFileSize      int

	MaxBufferedRows  int
	MaxBufferedBytes int
	MaxBufferedTime  time.Duration

	IngestBufferSize int

	// The maximum number of total data blocks that can be processed concurrently across all queries
	MaxQueryConcurrency int

	// Bloom filter parameters
	FileBloomExpectedItems uint
	BloomFalsePositiveRate float64

	// Compression configuration
	RowDataCompression CompressionType

	// Compression level for zstd (1-22, higher = better compression, slower)
	// Ignored for snappy compression
	ZstdCompressionLevel int

	// Maximum number of files to merge together in a single merge operation
	MaxFilesToMergePerOperation int
}

func DefaultBloomSearchEngineConfig

func DefaultBloomSearchEngineConfig() BloomSearchEngineConfig

type Combinator

type Combinator string

Combinator specifies how conditions should be combined

const (
	CombinatorAND Combinator = "AND"
	CombinatorOR  Combinator = "OR"
)

type CompressionType

type CompressionType string

CompressionType represents the compression algorithm used for row data

const (
	CompressionNone   CompressionType = "none"
	CompressionSnappy CompressionType = "snappy"
	CompressionZstd   CompressionType = "zstd"
)

type DataBlockMetadata

type DataBlockMetadata struct {
	// Absolute file offset (includes bloom filters at the beginning)
	Offset int

	// Size includes the bloom filters, their hash, and row data (no trailing hash)
	Size int
	Rows int

	// Size of the bloom filters section (bloom filters + hash)
	BloomFiltersSize int

	MinMaxIndexes map[string]MinMaxIndex `json:",omitempty"`
	PartitionID   string                 `json:",omitempty"`

	// Compression algorithm used for the row data in this block
	Compression CompressionType `json:",omitempty"`

	// Uncompressed size of row data (for decompression buffer allocation)
	UncompressedSize int `json:",omitempty"`

	// Hash of the compressed row data (for integrity verification)
	RowDataHash uint32 `json:",omitempty"`

	BloomExpectedItems     uint
	BloomFalsePositiveRate float64
}

func FilterDataBlocks

func FilterDataBlocks(blocks []DataBlockMetadata, query *QueryPrefilter) []DataBlockMetadata

FilterDataBlocks filters a slice of DataBlockMetadata based on query conditions

type DataStore

type DataStore interface {
	// CreateFile creates a file for single-pass writing, returning the handle for writing and the file pointer bytes.
	CreateFile(ctx context.Context) (io.WriteCloser, []byte, error)

	// OpenFile opens a file for reading.
	OpenFile(ctx context.Context, filePointerBytes []byte) (io.ReadSeekCloser, error)
}

DataStore is used to read and write the file from storage.

filePointerBytes is the serialized file pointer that is passed to the DataStore to open the file for reading, and stored within the MetaStore. For example, for an S3DataStore, this might be a serialzed JSON object of the bucket and file key.

type DeleteOperation

type DeleteOperation struct {
	FilePointerBytes []byte
}

type FieldValues

type FieldValues struct {
	Path   string
	Values []string
}

FieldValues represents a field path and its associated values

func UniqueFields

func UniqueFields(row map[string]any, delimiter string) []FieldValues

UniqueFields extracts all unique field paths from a nested map structure using the specified delimiter, returning tuples of (field_name, []values) where values are deduplicated per field. Arrays are traversed but indices are ignored, so duplicate paths from array elements are deduplicated.

Example:

{"user": {"name": "John", "tags": [{"type": "user"}, {"role": "admin"}]}}

Returns:

[{Path: "user.name", Values: ["John"]}, {Path: "user.tags.type", Values: ["user"]}, {Path: "user.tags.role", Values: ["admin"]}] (with delimiter ".")

type FileMetadata

type FileMetadata struct {
	BloomFilters           BloomFilters
	BloomExpectedItems     uint
	BloomFalsePositiveRate float64

	DataBlocks []DataBlockMetadata
}

func FileMetadataFromBytesWithHash

func FileMetadataFromBytesWithHash(bytes []byte, expectedHashBytes []byte) (*FileMetadata, error)

func (*FileMetadata) Bytes

func (f *FileMetadata) Bytes() ([]byte, []byte)

Returns the file metadata as a byte slice and the CRC32C of the file metadata

type FileSystemDataStore

type FileSystemDataStore struct {
	// contains filtered or unexported fields
}

func NewFileSystemDataStore

func NewFileSystemDataStore(rootDir string) *FileSystemDataStore

func (*FileSystemDataStore) CreateFile

func (fs *FileSystemDataStore) CreateFile(ctx context.Context) (io.WriteCloser, []byte, error)

func (*FileSystemDataStore) GetMaybeFilesForQuery

func (fs *FileSystemDataStore) GetMaybeFilesForQuery(ctx context.Context, query *QueryPrefilter) ([]MaybeFile, error)

func (*FileSystemDataStore) OpenFile

func (fs *FileSystemDataStore) OpenFile(ctx context.Context, filePointerBytes []byte) (io.ReadSeekCloser, error)

func (*FileSystemDataStore) Update

func (fs *FileSystemDataStore) Update(ctx context.Context, writes []WriteOperation, deletes []DeleteOperation) error

type FileSystemDataStoreFilePointer

type FileSystemDataStoreFilePointer struct {
	ID string
}

type MaybeFile

type MaybeFile struct {
	// The file pointer is serialized to bytes and passed to the DataStore to open the file for reading.
	PointerBytes []byte
	// The FileMetadata.DataBlocks may choose to be a filtered list instead of the full list of data blocks
	Metadata FileMetadata
	// The size of the file in bytes
	Size int
}

MaybeFile is a pointer to a file that may contain rows of interest based on pre-filtering conditions (partition IDs, minmax indexes). They have not had their bloom filters tested yet.

type MemoryMetaStore

type MemoryMetaStore struct {
	// contains filtered or unexported fields
}

MemoryMetaStore is a simple map-based implementation for testing

func NewSimpleMetaStore

func NewSimpleMetaStore() *MemoryMetaStore

func (*MemoryMetaStore) GetMaybeFilesForQuery

func (s *MemoryMetaStore) GetMaybeFilesForQuery(ctx context.Context, prefilter *QueryPrefilter) ([]MaybeFile, error)

GetMaybeFilesForQuery implements the MetaStore interface

func (*MemoryMetaStore) PrintFiles

func (s *MemoryMetaStore) PrintFiles()

PrintFiles prints all files in the metastore for debugging

func (*MemoryMetaStore) Update

func (s *MemoryMetaStore) Update(ctx context.Context, writeOps []WriteOperation, deleteOps []DeleteOperation) error

Update implements the MetaStore interface

type MergeStats

type MergeStats struct {
	FilesProcessed     int64
	RowGroupsProcessed int64
	RowsProcessed      int64
	BytesProcessed     int64
	Duration           time.Duration
	RowsPerSecond      float64
	BytesPerSecond     float64
}

type MetaStore

type MetaStore interface {
	// GetMaybeFilesForQuery returns pointers to files that may contain rows of interest based on the query conditions.
	// The returned files have already been pre-filtered based on partition IDs and MinMaxIndex conditions,
	// but their bloom filters have not been tested yet.
	//
	// If the query specifies partition ID or MinMax index conditions, but the file does not have them,
	// the file must be included in the result set, as it may have rows of interest.
	//
	// The MaybeFile.Metadata.DataBlocks may choose to be a filtered list instead of the full list of data blocks
	// if the query conditions are able to guarantee that some data blocks will not match the query conditions.
	GetMaybeFilesForQuery(ctx context.Context, query *QueryPrefilter) ([]MaybeFile, error)

	// Update atomically performes a set of operations on the MetaStore.
	Update(ctx context.Context, writes []WriteOperation, deletes []DeleteOperation) error
}

MetaStore is a generic interface for a metadata store that can be used to store and retrieve file and data block metadata.

FilePointer is a pointer to a file in the metadata store, depending on the implementation of the MetaStore and DataStore.

type MinMaxIndex

type MinMaxIndex struct {
	Min int64
	Max int64
}

func UpdateMinMaxIndex

func UpdateMinMaxIndex(existing MinMaxIndex, newMin, newMax int64) MinMaxIndex

UpdateMinMaxIndex updates an existing MinMaxIndex with new min/max values.

type NullDataStore

type NullDataStore struct{}

TESTING

func (*NullDataStore) CreateFile

func (n *NullDataStore) CreateFile(ctx context.Context) (io.WriteCloser, []byte, error)

func (*NullDataStore) OpenFile

func (n *NullDataStore) OpenFile(ctx context.Context, filePointerBytes []byte) (io.ReadSeekCloser, error)

type NullDataStoreFilePointer

type NullDataStoreFilePointer struct {
	ID string
}

type NullMetaStore

type NullMetaStore struct{}

func (*NullMetaStore) GetMaybeFilesForQuery

func (n *NullMetaStore) GetMaybeFilesForQuery(ctx context.Context, query *QueryPrefilter) ([]MaybeFile, error)

func (*NullMetaStore) Update

func (n *NullMetaStore) Update(ctx context.Context, writes []WriteOperation, deletes []DeleteOperation) error

type NumericCondition

type NumericCondition struct {
	Operator QueryOperator `json:",omitempty"` // for EQ, NE, GT, GTE, LT, LTE
	Value    int64         `json:",omitempty"` // for EQ, NE, GT, GTE, LT, LTE
	Values   []int64       `json:",omitempty"` // for IN, NOT_IN
	Min      int64         `json:",omitempty"` // for BETWEEN, NOT_BETWEEN
	Max      int64         `json:",omitempty"` // for BETWEEN, NOT_BETWEEN
}

NumericCondition represents a condition on numeric values (like MinMaxIndex)

func NumericBetween

func NumericBetween(min, max int64) NumericCondition

NumericBetween creates a numeric BETWEEN condition (inclusive)

func NumericEquals

func NumericEquals(value int64) NumericCondition

NumericEquals creates a numeric equality condition

func NumericGreaterThan

func NumericGreaterThan(value int64) NumericCondition

NumericGreaterThan creates a numeric greater than condition

func NumericGreaterThanEqual

func NumericGreaterThanEqual(value int64) NumericCondition

NumericGreaterThanEqual creates a numeric greater than or equal condition

func NumericIn

func NumericIn(values ...int64) NumericCondition

NumericIn creates a numeric IN condition

func NumericLessThan

func NumericLessThan(value int64) NumericCondition

NumericLessThan creates a numeric less than condition

func NumericLessThanEqual

func NumericLessThanEqual(value int64) NumericCondition

NumericLessThanEqual creates a numeric less than or equal condition

func NumericNotBetween

func NumericNotBetween(min, max int64) NumericCondition

NumericNotBetween creates a numeric NOT BETWEEN condition (exclusive)

func NumericNotEquals

func NumericNotEquals(value int64) NumericCondition

NumericNotEquals creates a numeric not equal condition

func NumericNotIn

func NumericNotIn(values ...int64) NumericCondition

NumericNotIn creates a numeric NOT IN condition

type PartitionFunc

type PartitionFunc func(row map[string]any) string

type PrefilterCondition

type PrefilterCondition struct {
	// contains filtered or unexported fields
}

type PrefilterExpression

type PrefilterExpression struct {
	// contains filtered or unexported fields
}

func MinMax

func MinMax(fieldName string, condition NumericCondition) PrefilterExpression

func Partition

func Partition(condition StringCondition) PrefilterExpression

func PrefilterAnd

func PrefilterAnd(expressions ...PrefilterExpression) PrefilterExpression

func PrefilterOr

func PrefilterOr(expressions ...PrefilterExpression) PrefilterExpression

type Query

type Query struct {
	Prefilter *QueryPrefilter // for partitions and minmax indexes
	Bloom     *BloomQuery     // for field/token/fieldtoken searches
}

Query combines prefiltering (partitions/minmax) with bloom filtering

type QueryBuilder

type QueryBuilder struct {
	// contains filtered or unexported fields
}

func NewQuery

func NewQuery() *QueryBuilder

NewQuery creates a query builder with an implicit AND expression.

func (*QueryBuilder) Build

func (b *QueryBuilder) Build() *Query

func (*QueryBuilder) Field

func (b *QueryBuilder) Field(field string) *QueryBuilder

func (*QueryBuilder) FieldToken

func (b *QueryBuilder) FieldToken(field, token string) *QueryBuilder

func (*QueryBuilder) Match

func (b *QueryBuilder) Match(expression BloomExpression) *QueryBuilder

func (*QueryBuilder) MatchPrefilter

func (b *QueryBuilder) MatchPrefilter(expression PrefilterExpression) *QueryBuilder

Prefilter methods

func (*QueryBuilder) Token

func (b *QueryBuilder) Token(token string) *QueryBuilder

type QueryOperator

type QueryOperator string

QueryOperator represents the type of comparison operation

const (
	// Equality operators
	OpEqual    QueryOperator = "EQ"
	OpNotEqual QueryOperator = "NE"

	// Comparison operators
	OpGreaterThan      QueryOperator = "GT"
	OpGreaterThanEqual QueryOperator = "GTE"
	OpLessThan         QueryOperator = "LT"
	OpLessThanEqual    QueryOperator = "LTE"

	// Set operators
	OpIn    QueryOperator = "IN"
	OpNotIn QueryOperator = "NOT_IN"

	// Range operators
	OpBetween    QueryOperator = "BETWEEN"
	OpNotBetween QueryOperator = "NOT_BETWEEN"
)

type QueryPrefilter

type QueryPrefilter struct {
	Expression *PrefilterExpression `json:",omitempty"`
}

QueryPrefilter represents prefilter conditions against partition IDs and minmax indexes. Expressions can be combined arbitrarily with AND/OR using PrefilterAnd and PrefilterOr.

func NewQueryPrefilter

func NewQueryPrefilter() *QueryPrefilter

type StringCondition

type StringCondition struct {
	Operator QueryOperator `json:",omitempty"` // for EQ, NE, GT, GTE, LT, LTE
	Value    string        `json:",omitempty"` // for EQ, NE, GT, GTE, LT, LTE
	Values   []string      `json:",omitempty"` // for IN, NOT_IN
	Min      string        `json:",omitempty"` // for BETWEEN, NOT_BETWEEN
	Max      string        `json:",omitempty"` // for BETWEEN, NOT_BETWEEN
}

StringCondition represents a condition on string values (like partition IDs)

func PartitionBetween

func PartitionBetween(min, max string) StringCondition

PartitionBetween creates a partition BETWEEN condition (inclusive)

func PartitionEquals

func PartitionEquals(value string) StringCondition

PartitionEquals creates a partition equality condition

func PartitionGreaterThan

func PartitionGreaterThan(value string) StringCondition

PartitionGreaterThan creates a partition greater than condition

func PartitionGreaterThanEqual

func PartitionGreaterThanEqual(value string) StringCondition

PartitionGreaterThanEqual creates a partition greater than or equal condition

func PartitionIn

func PartitionIn(values ...string) StringCondition

PartitionIn creates a partition IN condition

func PartitionLessThan

func PartitionLessThan(value string) StringCondition

PartitionLessThan creates a partition less than condition

func PartitionLessThanEqual

func PartitionLessThanEqual(value string) StringCondition

PartitionLessThanEqual creates a partition less than or equal condition

func PartitionNotBetween

func PartitionNotBetween(min, max string) StringCondition

PartitionNotBetween creates a partition NOT BETWEEN condition (exclusive)

func PartitionNotEquals

func PartitionNotEquals(value string) StringCondition

PartitionNotEquals creates a partition not equal condition

func PartitionNotIn

func PartitionNotIn(values ...string) StringCondition

PartitionNotIn creates a partition NOT IN condition

type ValueTokenizerFunc

type ValueTokenizerFunc func(value any) []string

ValueTokenizerFunc is a function that tokenizes a field value into a list of tokens

type WriteOperation

type WriteOperation struct {
	FileMetadata     *FileMetadata
	FilePointerBytes []byte
}

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL