lsmkv

package
v1.34.4 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Dec 3, 2025 License: BSD-3-Clause Imports: 62 Imported by: 1

Documentation

Overview

LSMKV (= Log-structured Merge-Tree Key-Value Store)

This package contains Weaviate's custom LSM store. While modeled after the usecases that are required for Weaviate to be fast, reliable, and scalable, it is technically completely independent. You could build your own database on top of this key-value store.

Covering the architecture of LSM Stores in general goes beyond the scope of this documentation. Therefore things that are specific to this implementation are highlighted.

Strategies

To understand the different type of buckets in this store, you need to familiarize yourself with the following strategies. A strategy defines a different usecase for a Bucket.

  • "Replace"

    Replace resembles the classical key-value store. Each key has exactly one value. A subsequent PUT on an an existing key, replaces the value (hence the name "replace"). Once replaced a former value can no longer be retrieved, and will eventually be removed in compactions.

  • "Set" (aka "SetCollection")

    A set behaves like an unordered collection of independent values. In other words a single key has multiple values. For example, for key "foo", you could have values "bar1", "bar2", "bazzinga". A bucket of this type is optimized for cheap writes to add new set additions. For example adding another set element has a fixed cost independent of the number of the existing set length. This makes it very well suited for building an inverted index.

    Retrieving a Set has a slight cost to it if a set is spread across multiple segments. This cost will eventually reduce as more and more compactions happen. In the ideal case (fully compacted DB), retrieving a Set requires just a single disk read.

  • "Map" (aka "MapCollection")

    Maps are similar to Sets in the sense that for a single key there are multiple values. However, each value is in itself a key-value pair. This makes this type very similar to a dict or hashmap type. For example for key "foo", you could have value pairs: "bar":17, "baz":19.

    This makes a map a great use case for an inverted index that needs to store additional info beyond just the docid-pointer, such as in the case of a BM25 index where the term frequency needs to be stored.

    The same performance-considerations as for sets apply.

Navigate around these docs

Good entrypoints to learn more about how this package works include Store with New and Store.CreateOrLoadBucket, as well as Bucket with Bucket.Get, Bucket.GetBySecondary, Bucket.Put, etc.

Each strategy also supports cursor types: CursorReplace can be created using Bucket.Cursor, CursorSet can be created with Bucket.SetCursor , and CursorMap can be created with Bucket.MapCursor.

Index

Constants

View Source
const (
	MetadataVersion    = 0
	MetadataFileSuffix = ".metadata"
)
View Source
const (
	// StrategyReplace allows for idem-potent PUT where the latest takes presence
	StrategyReplace         = "replace"
	StrategySetCollection   = "setcollection"
	StrategyMapCollection   = "mapcollection"
	StrategyRoaringSet      = "roaringset"
	StrategyRoaringSetRange = "roaringsetrange"
	StrategyInverted        = "inverted"
)
View Source
const CountNetAdditionsFileSuffix = ".cna"
View Source
const CurrentCommitLogVersion uint8 = 1
View Source
const DeleteMarkerSuffix = ".deleteme"
View Source
const (
	FlushAfterDirtyDefault = 60 * time.Second
)

Variables

View Source
var DimensionsBucketPrioritizedStrategies = []string{
	StrategyRoaringSet,
	StrategyMapCollection,
}

since v1.34 StrategyRoaringSet is default strategy for dimensions bucket, StrategyMapCollection is left as backward compatibility for buckets created earlier

View Source
var ErrAlreadyClosed = errors.New("store already closed")
View Source
var ErrBucketAlreadyRegistered = errors.New("bucket already registered")
View Source
var ErrInvalidChecksum = errors.New("invalid checksum")

ErrInvalidChecksum indicates that the read file should not be trusted. For any pre-computed data this is a recoverable issue, as the data can simply be re-computed at read-time.

View Source
var GlobalBucketRegistry *globalBucketRegistry

Functions

func CheckExpectedStrategy added in v1.18.0

func CheckExpectedStrategy(strategy string, expectedStrategies ...string) error

func CheckStrategyRoaringSet added in v1.26.0

func CheckStrategyRoaringSet(strategy string) error

func CheckStrategyRoaringSetRange added in v1.26.0

func CheckStrategyRoaringSetRange(strategy string) error

func DefaultSearchableStrategy added in v1.28.5

func DefaultSearchableStrategy(useInvertedSearchable bool) string

func DetermineUnloadedBucketStrategy added in v1.31.17

func DetermineUnloadedBucketStrategy(bucketPath string) (string, error)

func DetermineUnloadedBucketStrategyAmong added in v1.31.17

func DetermineUnloadedBucketStrategyAmong(bucketPath string, prioritizedStrategies []string) (string, error)

func DoBlockMaxAnd added in v1.30.17

func DoBlockMaxAnd(ctx context.Context, limit int, resultsByTerm Terms, averagePropLength float64, additionalExplanations bool,
	termCount int, minimumOrTokensMatch int, logger logrus.FieldLogger,
) *priorityqueue.Queue[[]*terms.DocPointerWithScore]

func DoBlockMaxWand added in v1.28.5

func DoBlockMaxWand(ctx context.Context, limit int, results Terms, averagePropLength float64, additionalExplanations bool,
	termCount, minimumOrTokensMatch int, logger logrus.FieldLogger,
) (*priorityqueue.Queue[[]*terms.DocPointerWithScore], error)

func DoWand added in v1.28.5

func DoWand(ctx context.Context, limit int, results *terms.Terms, averagePropLength float64, additionalExplanations bool,
	minimumOrTokensMatch int, logger logrus.FieldLogger,
) *priorityqueue.Queue[[]*terms.DocPointerWithScore]

func IsExpectedStrategy added in v1.18.0

func IsExpectedStrategy(strategy string, expectedStrategies ...string) bool

func MustBeExpectedStrategy added in v1.26.0

func MustBeExpectedStrategy(strategy string, expectedStrategies ...string)

func ParseCollectionNode

func ParseCollectionNode(r io.Reader) (segmentCollectionNode, error)

ParseCollectionNode reads from r and parses the collection values into a segmentCollectionNode

When only given an offset, r is constructed as a *bufio.Reader to avoid first reading the entire segment (could be GBs). Each consecutive read will be buffered to avoid excessive syscalls.

When we already have a finite and manageable []byte (i.e. when we have already seeked to an lsmkv node and have start+end offset), r should be constructed as a *bytes.Reader, since the contents have already been `pread` from the segment contentFile.

func ParseCollectionNodeInto

func ParseCollectionNodeInto(r io.Reader, node *segmentCollectionNode) error

ParseCollectionNodeInto takes the []byte slice and parses it into the specified node. It does not perform any copies and the caller must be aware that memory may be shared between the two. As a result, the caller must make sure that they do not modify "in" while "node" is still in use. A safer alternative is to use ParseCollectionNode.

The primary intention of this function is to provide a way to reuse buffers when the lifetime is controlled tightly, for example in cursors used within compactions. Use at your own risk!

If the buffers of the provided node have enough capacity they will be reused. Only if the capacity is not enough, will an allocation occur. This allocation uses 25% overhead to avoid future allocations for nodes of similar size.

As a result calling this method only makes sense if you plan on calling it multiple times. Calling it just once on an uninitialized node does not have major advantages over calling ParseCollectionNode.

func ParseInvertedNode added in v1.28.0

func ParseInvertedNode(r io.Reader) (segmentCollectionNode, error)

ParseInvertedNode reads from r and parses the Inverted values into a segmentCollectionNode

When only given an offset, r is constructed as a *bufio.Reader to avoid first reading the entire segment (could be GBs). Each consecutive read will be buffered to avoid excessive syscalls.

When we already have a finite and manageable []byte (i.e. when we have already seeked to an lsmkv node and have start+end offset), r should be constructed as a *bytes.Reader, since the contents have already been `pread` from the segment contentFile.

func ParseReplaceNode

func ParseReplaceNode(r io.Reader, secondaryIndexCount uint16) (segmentReplaceNode, error)

func ParseReplaceNodeIntoMMAP added in v1.23.0

func ParseReplaceNodeIntoMMAP(r *byteops.ReadWriter, secondaryIndexCount uint16, out *segmentReplaceNode) error

func ParseReplaceNodeIntoPread added in v1.23.0

func ParseReplaceNodeIntoPread(r io.Reader, secondaryIndexCount uint16, out *segmentReplaceNode) (err error)

func ReadCountNetAdditionsFile added in v1.32.0

func ReadCountNetAdditionsFile(path string) (int64, error)

ReadCountNetAdditionsFile reads a .cna file and returns the count net additions value Returns (count, nil) if successful, (0, error) if the file is invalid or corrupted

func ReadObjectCountFromMetadataFile added in v1.32.1

func ReadObjectCountFromMetadataFile(path string) (int64, error)

ReadObjectCountFromMetadataFile reads a .metadata file and returns the count net additions value Returns (count, nil) if successful, (0, error) if the file is invalid or corrupted

func SegmentStrategyFromString

func SegmentStrategyFromString(in string) segmentindex.Strategy

Types

type BlockMetrics added in v1.28.0

type BlockMetrics struct {
	BlockCountTotal         uint64
	BlockCountDecodedDocIds uint64
	BlockCountDecodedFreqs  uint64
	DocCountTotal           uint64
	DocCountDecodedDocIds   uint64
	DocCountDecodedFreqs    uint64
	DocCountScored          uint64
	QueryCount              uint64
	LastAddedBlock          int
}

type Bucket

type Bucket struct {
	// contains filtered or unexported fields
}

func NewBucketCreator added in v1.25.0

func NewBucketCreator() *Bucket

func (*Bucket) ApplyToObjectDigests added in v1.28.5

func (b *Bucket) ApplyToObjectDigests(ctx context.Context,
	afterInMemCallback func(), f func(object *storobj.Object) error,
) error

ApplyToObjectDigests iterates over all objects in the bucket, both in memtable and on disk, and applies the given function to each object. The afterInMemCallback is called after the in-memory memtable has been processed. This allows the caller to perform actions that need to happen after the in-memory objects have been processed. The function f is called for each object, and if it returns an error, the processing is stopped and the error is returned. Note: this function pauses compaction while it is running, to ensure a consistent view of the data.

func (*Bucket) Count

func (b *Bucket) Count(ctx context.Context) (int, error)

func (*Bucket) CountAsync added in v1.23.10

func (b *Bucket) CountAsync() int

CountAsync ignores the current memtable, that makes it async because it only reflects what has been already flushed. This in turn makes it very cheap to call, so it can be used for observability purposes where eventual consistency on the count is fine, but a large cost is not.

func (*Bucket) CreateDiskTerm added in v1.28.0

func (b *Bucket) CreateDiskTerm(N float64, filterDocIds helpers.AllowList, query []string, propName string, propertyBoost float32, duplicateTextBoosts []int, config schema.BM25Config, ctx context.Context) ([][]*SegmentBlockMax, map[string]uint64, func(), error)

func (*Bucket) Cursor

func (b *Bucket) Cursor() *CursorReplace

Cursor allows you to scan over all key-value pairs in the bucket. You can start with the first element using .First() or seek to an arbitrary key using .Seek(key). You have reached the end when no more keys are returned.

During the lifetime of the cursor, you have a consistent view of the bucket. It does not hold any locks to do so. It only holds the flushLock during initialization. Nevertheless, it must be closed using the .Close() method, as it holds references to underlying disk segments.

There are no references to memtables, as their entire content is copied during init time. This is also a potential limitation of a curors, the initialization can be quite costly if memtables are large.

func (*Bucket) CursorInMem added in v1.28.5

func (b *Bucket) CursorInMem() *CursorReplace

CursorInMemWith returns a cursor which scan over the primary key of entries not yet persisted on disk. Segment creation and compaction will be blocked until the cursor is closed

func (*Bucket) CursorOnDisk added in v1.28.5

func (b *Bucket) CursorOnDisk() *CursorReplace

CursorOnDiskWith returns a cursor which scan over the primary key of entries already persisted on disk. New segments can still be created but compaction will be prevented while any cursor remains active

func (*Bucket) CursorRoaringSet added in v1.18.0

func (b *Bucket) CursorRoaringSet() CursorRoaringSet

CursorRoaringSet behaves like [Cursor], but for the RoaringSet strategy. It needs to be closed using .Close() to free references to the underlying segments.

func (*Bucket) CursorRoaringSetKeyOnly added in v1.18.0

func (b *Bucket) CursorRoaringSetKeyOnly() CursorRoaringSet

CursorRoaringSetKey is the equivalent of CursorRoaringSet, but only returns keys. See [Cursor] for details on snapshot isolation. Needs to be closed using .Close() to free references to the underlying disk segments.

func (*Bucket) CursorWithSecondaryIndex added in v1.26.0

func (b *Bucket) CursorWithSecondaryIndex(pos int) *CursorReplace

CursorWithSecondaryIndex holds a RLock for the flushing state. It needs to be closed using the .Close() methods or otherwise the lock will never be released

func (*Bucket) DebugGetSegmentGroupLockStatus added in v1.31.17

func (b *Bucket) DebugGetSegmentGroupLockStatus() (bool, error)

DEBUG METHOD: don't use in any real production use-case - This method gets the lock status of the segment group in the bucket

func (*Bucket) Delete

func (b *Bucket) Delete(key []byte, opts ...SecondaryKeyOption) (err error)

Delete removes the given row. Note that LSM stores are append only, thus internally this action appends a tombstone. The entry will not be removed until a compaction has run, and even then a compaction does not guarantee the removal of the data right away. This is because an entry could have been created in an older segment than those present in the compaction. This can be seen as an implementation detail, unless the caller expects to free disk space by calling this method. Such freeing is not guaranteed.

Delete is specific to the Replace Strategy. For Maps, you can use Bucket.MapDeleteKey to delete a single key-value pair, for Sets use Bucket.SetDeleteSingle to delete a single set element.

func (*Bucket) DeleteWith added in v1.25.26

func (b *Bucket) DeleteWith(key []byte, deletionTime time.Time, opts ...SecondaryKeyOption) (err error)

func (*Bucket) DesiredStrategy added in v1.18.0

func (b *Bucket) DesiredStrategy() string

func (*Bucket) DocPointerWithScoreList added in v1.24.25

func (b *Bucket) DocPointerWithScoreList(ctx context.Context, key []byte, propBoost float32, cfgs ...MapListOption) ([]terms.DocPointerWithScore, error)

func (*Bucket) FlushAndSwitch

func (b *Bucket) FlushAndSwitch() error

FlushAndSwitch is the main way to flush a memtable, replace it with a new one, and make sure that the flushed segment gets added to the segment group.

Flushing and adding a segment can take considerable time, which is why the whole process is designed to be non-blocking.

To achieve a non-blocking flush, the process is split into four parts:

  1. atomicallySwitchMemtable: A new memtable is created, the previous memtable is moved from "active" to "flushing". This switch is blocking (holds b.flushLock.Lock()), but extremely fast, as we essentially just switch a pointer.

  2. flush: The previous memtable is flushed to disk. This may take considerable time as we are I/O-bound. This is done "in the background"meaning that it does not block any CRUD operations for the user. It only blocks the flush process itself, meaning only one flush per bucket can happen simultaneously. This is by design.

  3. initAndPrecomputeNewSegment: (Newly added in https://github.com/weaviate/weaviate/pull/5943, early October 2024). After the previous flush step the segment can now be initialized. However, to make it usable for real life, we still need to compute metadata, such as bloom filters (all types) and net count additions (only Replace type). Bloom filters can be calculated in isolation and are therefore fairly trivial. Net count additions on the other hand are more complex, as they depend on all previous segments. Calculating net count additions can take a considerable amount of time, especially as the buckets grow larger. As a result, we need to provide two guarantees: (1) the calculation is non-blocking from a user's POV and (2) for the duration of the calculation, the segment group is considered stable, i.e. no other segments are added, removed, or merged. We can achieve this by holding a `b.disk.maintenanceLock.RLock()` which prevents modification of the segments array, but does not block user operation (which are themselves RLock-holders on that same Lock).

  4. atomicallyAddDiskSegmentAndRemoveFlushing: The previous method returned a fully initialized segment that has not yet been added to the segment group. This last step is the counter part to the first step and again blocking, but fast. It adds the segment to the segment group  which at this point is just a simple array append. At the same time it removes the "flushing" memtable. It holds the `b.flushLock.Lock()` making this operation atomic, but blocking.

FlushAndSwitch is typically called periodically and does not require manual calling, but there are some situations where this might be intended, such as in test scenarios or when a force flush is desired.

func (*Bucket) FlushMemtable

func (b *Bucket) FlushMemtable() error

FlushMemtable flushes any active memtable and returns only once the memtable has been fully flushed and a stable state on disk has been reached.

This is a preparatory stage for creating backups.

Method should be run only if flushCycle is not running (was not started, is stopped, or noop impl is provided)

func (*Bucket) Get

func (b *Bucket) Get(key []byte) ([]byte, error)

Get retrieves the single value for the given key.

Get is specific to ReplaceStrategy and cannot be used with any of the other strategies. Use Bucket.SetList or Bucket.MapList instead.

Get uses the regular or "primary" key for an object. If a bucket has secondary indexes, use Bucket.GetBySecondary to retrieve an object using its secondary key

func (*Bucket) GetAveragePropertyLength added in v1.28.14

func (b *Bucket) GetAveragePropertyLength() (float64, error)

func (*Bucket) GetBySecondary

func (b *Bucket) GetBySecondary(ctx context.Context, pos int, key []byte) ([]byte, error)

GetBySecondary retrieves an object using one of its secondary keys. A bucket can have an infinite number of secondary keys. Specify the secondary key position as the first argument.

A real-life example of secondary keys is the Weaviate object store. Objects are stored with the user-facing ID as their primary key and with the doc-id (an ever-increasing uint64) as the secondary key.

Similar to Bucket.Get, GetBySecondary is limited to ReplaceStrategy. No equivalent exists for Set and Map, as those do not support secondary indexes.

func (*Bucket) GetBySecondaryWithBuffer added in v1.22.6

func (b *Bucket) GetBySecondaryWithBuffer(ctx context.Context, pos int, seckey []byte, buffer []byte) ([]byte, []byte, error)

GetBySecondaryWithBuffer is like Bucket.GetBySecondary, but also takes a buffer. It's in the response of the caller to pool the buffer, since the bucket does not know when the caller is done using it. The return bytes will likely point to the same memory that's part of the buffer. However, if the buffer is to small, a larger buffer may also be returned (second arg).

Similar to Bucket.Get, GetBySecondaryWithBuffer is limited to ReplaceStrategy. No equivalent exists for Set and Map, as those do not support secondary indexes.

func (*Bucket) GetDesiredStrategy added in v1.22.0

func (b *Bucket) GetDesiredStrategy() string

func (*Bucket) GetDir added in v1.22.0

func (b *Bucket) GetDir() string

func (*Bucket) GetErrDeleted added in v1.23.15

func (b *Bucket) GetErrDeleted(key []byte) ([]byte, error)

func (*Bucket) GetFlushCallbackCtrl added in v1.22.0

func (b *Bucket) GetFlushCallbackCtrl() cyclemanager.CycleCallbackCtrl

func (*Bucket) GetMemtableThreshold added in v1.22.0

func (b *Bucket) GetMemtableThreshold() uint64

func (*Bucket) GetRootDir added in v1.22.0

func (b *Bucket) GetRootDir() string

func (*Bucket) GetSecondaryIndices added in v1.22.0

func (b *Bucket) GetSecondaryIndices() uint16

func (*Bucket) GetStatus added in v1.22.0

func (b *Bucket) GetStatus() storagestate.Status

func (*Bucket) GetStrategy added in v1.22.0

func (b *Bucket) GetStrategy() string

func (*Bucket) GetWalThreshold added in v1.22.0

func (b *Bucket) GetWalThreshold() uint64

func (*Bucket) IterateMapObjects added in v1.22.0

func (b *Bucket) IterateMapObjects(ctx context.Context, f func([]byte, []byte, []byte, bool) error) error

func (*Bucket) IterateObjects

func (b *Bucket) IterateObjects(ctx context.Context, f func(object *storobj.Object) error) error

func (*Bucket) ListFiles

func (b *Bucket) ListFiles(ctx context.Context, basePath string) ([]string, error)

ListFiles lists all files that currently exist in the Bucket. The files are only in a stable state if the memtable is empty, and if compactions are paused. If one of those conditions is not given, it errors

func (*Bucket) MapCursor

func (b *Bucket) MapCursor(cfgs ...MapListOption) (*CursorMap, error)

func (*Bucket) MapCursorKeyOnly

func (b *Bucket) MapCursorKeyOnly(cfgs ...MapListOption) (*CursorMap, error)

func (*Bucket) MapDeleteKey

func (b *Bucket) MapDeleteKey(rowKey, mapKey []byte) error

MapDeleteKey removes one key-value pair from the given map row. Note that LSM stores are append only, thus internally this action appends a tombstone. The entry will not be removed until a compaction has run, and even then a compaction does not guarantee the removal of the data right away. This is because an entry could have been created in an older segment than those present in the compaction. This can be seen as an implementation detail, unless the caller expects to free disk space by calling this method. Such freeing is not guaranteed.

MapDeleteKey is specific to the Map Strategy. For Replace, you can use Bucket.Delete to delete the entire row, for Sets use Bucket.SetDeleteSingle to delete a single set element.

func (*Bucket) MapList

func (b *Bucket) MapList(ctx context.Context, key []byte, cfgs ...MapListOption) ([]MapPair, error)

MapList returns all map entries for a given row key. The order of map pairs has no specific meaning. For efficient merge operations, pair entries are stored sorted on disk, however that is an implementation detail and not a caller-facing guarantee.

MapList is specific to the Map strategy, for Sets use Bucket.SetList, for Replace use Bucket.Get.

func (*Bucket) MapSet

func (b *Bucket) MapSet(rowKey []byte, kv MapPair) error

MapSet writes one MapPair into the map for the given row key. It is agnostic of whether the row key already exists, as well as agnostic of whether the map key already exists. In both cases it will create the entry if it does not exist or override if it does.

Example to add a new MapPair:

pair := MapPair{Key: []byte("Jane"), Value: []byte("Backend")}
err := bucket.MapSet([]byte("developers"), pair)
if err != nil {
	/* do something */
}

MapSet is specific to the Map Strategy, for Replace use Bucket.Put, and for Set use Bucket.SetAdd instead.

func (*Bucket) MapSetMulti

func (b *Bucket) MapSetMulti(rowKey []byte, kvs []MapPair) error

MapSetMulti is the same as Bucket.MapSet, except that it takes in multiple MapPair objects at the same time.

func (*Bucket) NewBucket added in v1.25.0

func (*Bucket) NewBucket(ctx context.Context, dir, rootDir string, logger logrus.FieldLogger,
	metrics *Metrics, compactionCallbacks, flushCallbacks cyclemanager.CycleCallbackGroup,
	opts ...BucketOption,
) (b *Bucket, err error)

NewBucket initializes a new bucket. It either loads the state from disk if it exists, or initializes new state.

You do not need to ever call NewBucket() yourself, if you are using a Store. In this case the Store can manage buckets for you, using methods such as CreateOrLoadBucket().

func (*Bucket) Put

func (b *Bucket) Put(key, value []byte, opts ...SecondaryKeyOption) (err error)

Put creates or replaces a single value for a given key.

err := bucket.Put([]byte("my_key"), []byte("my_value"))
 if err != nil {
	/* do something */
}

If a bucket has a secondary index configured, you can also specify one or more secondary keys, like so:

err := bucket.Put([]byte("my_key"), []byte("my_value"),
	WithSecondaryKey(0, []byte("my_alternative_key")),
)
 if err != nil {
	/* do something */
}

Put is limited to ReplaceStrategy, use Bucket.SetAdd for Set or Bucket.MapSet and Bucket.MapSetMulti.

func (*Bucket) QuantileKeys added in v1.25.11

func (b *Bucket) QuantileKeys(q int) [][]byte

QuantileKeys returns an approximation of the keys that make up the specified quantiles. This can be used to start parallel cursors at fairly evenly distributed positions in the segment.

To understand the approximation, checkout [lsmkv.segmentindex.DiskTree.QuantileKeys] that runs on each segment.

Some things to keep in mind:

  1. It may return fewer keys than requested (including 0) if the segment contains fewer entries
  2. It may return keys that do not exist, for example because they are tombstoned. This is acceptable, as a key does not have to exist to be used as part of .Seek() in a cursor.
  3. It will never return duplicates, to make sure all parallel cursors return unique values.

func (*Bucket) ReaderRoaringSetRange added in v1.26.5

func (b *Bucket) ReaderRoaringSetRange() ReaderRoaringSetRange

func (*Bucket) RoaringSetAddBitmap added in v1.18.0

func (b *Bucket) RoaringSetAddBitmap(key []byte, bm *sroar.Bitmap) error

func (*Bucket) RoaringSetAddList added in v1.18.0

func (b *Bucket) RoaringSetAddList(key []byte, values []uint64) error

func (*Bucket) RoaringSetAddOne added in v1.18.0

func (b *Bucket) RoaringSetAddOne(key []byte, value uint64) error

func (*Bucket) RoaringSetGet added in v1.18.0

func (b *Bucket) RoaringSetGet(key []byte) (bm *sroar.Bitmap, release func(), err error)

func (*Bucket) RoaringSetRangeAdd added in v1.26.0

func (b *Bucket) RoaringSetRangeAdd(key uint64, values ...uint64) error

func (*Bucket) RoaringSetRangeRemove added in v1.26.0

func (b *Bucket) RoaringSetRangeRemove(key uint64, values ...uint64) error

func (*Bucket) RoaringSetRemoveOne added in v1.18.0

func (b *Bucket) RoaringSetRemoveOne(key []byte, value uint64) error

func (*Bucket) SetAdd

func (b *Bucket) SetAdd(key []byte, values [][]byte) error

SetAdd adds one or more Set-Entries to a Set for the given key. SetAdd is entirely agnostic of existing entries, it acts as append-only. This also makes it agnostic of whether the key already exists or not.

Example to add two entries to a set:

err := bucket.SetAdd([]byte("my_key"), [][]byte{
	[]byte("one-set-element"), []byte("another-set-element"),
})
if err != nil {
	/* do something */
}

SetAdd is specific to the Set strategy. For Replace, use Bucket.Put, for Map use either Bucket.MapSet or Bucket.MapSetMulti.

func (*Bucket) SetCursor

func (b *Bucket) SetCursor() *CursorSet

SetCursor behaves like [Cursor], but for the RoaringSet strategy. It needs to be closed using .Close() to free references to the underlying segments.

func (*Bucket) SetCursorKeyOnly

func (b *Bucket) SetCursorKeyOnly() *CursorSet

SetCursorKeyOnly returns nil for all values. It has no control over the underlying "inner" cursors which may still retrieve a value which is then discarded. It does however, omit any handling of values, such as decoding, making this considerably more efficient if only keys are required.

The same locking rules as for SetCursor apply.

func (*Bucket) SetDeleteSingle

func (b *Bucket) SetDeleteSingle(key []byte, valueToDelete []byte) error

SetDeleteSingle removes one Set element from the given key. Note that LSM stores are append only, thus internally this action appends a tombstone. The entry will not be removed until a compaction has run, and even then a compaction does not guarantee the removal of the data right away. This is because an entry could have been created in an older segment than those present in the compaction. This can be seen as an implementation detail, unless the caller expects to free disk space by calling this method. Such freeing is not guaranteed.

SetDeleteSingle is specific to the Set Strategy. For Replace, you can use Bucket.Delete to delete the entire row, for Maps use Bucket.MapDeleteKey to delete a single map entry.

func (*Bucket) SetList

func (b *Bucket) SetList(key []byte) ([][]byte, error)

SetList returns all Set entries for a given key.

SetList is specific to the Set Strategy, for Map use Bucket.MapList, and for Replace use Bucket.Get.

func (*Bucket) SetMemtableThreshold

func (b *Bucket) SetMemtableThreshold(size uint64)

func (*Bucket) Shutdown

func (b *Bucket) Shutdown(ctx context.Context) (err error)

func (*Bucket) Strategy

func (b *Bucket) Strategy() string

func (*Bucket) UpdateStatus

func (b *Bucket) UpdateStatus(status storagestate.Status)

UpdateStatus is used by the parent shard to communicate to the bucket when the shard has been set to readonly, or when it is ready for writes.

func (*Bucket) WasDeleted added in v1.18.0

func (b *Bucket) WasDeleted(key []byte) (bool, time.Time, error)

WasDeleted determines if an object used to exist in the LSM store

There are 3 different locations that we need to check for the key in this order: active memtable, flushing memtable, and disk segment

func (*Bucket) WriteWAL

func (b *Bucket) WriteWAL() error

the WAL uses a buffer and isn't written until the buffer size is crossed or this function explicitly called. This allows to avoid unnecessary disk writes in larger operations, such as batches. It is sufficient to call write on the WAL just once. This does not make a batch atomic, but it guarantees that the WAL is written before a successful response is returned to the user.

type BucketConsistentView added in v1.31.18

type BucketConsistentView struct {
	Active   memtable
	Flushing memtable
	Disk     []Segment
	// contains filtered or unexported fields
}

func (*BucketConsistentView) Release added in v1.31.18

func (cv *BucketConsistentView) Release()

type BucketCreator added in v1.25.0

type BucketCreator interface {
	NewBucket(ctx context.Context, dir, rootDir string, logger logrus.FieldLogger,
		metrics *Metrics, compactionCallbacks, flushCallbacks cyclemanager.CycleCallbackGroup,
		opts ...BucketOption,
	) (*Bucket, error)
}

type BucketOption

type BucketOption func(b *Bucket) error

func WithAllocChecker added in v1.24.9

func WithAllocChecker(mm memwatch.AllocChecker) BucketOption

func WithBM25Config added in v1.28.14

func WithBM25Config(bm25Config *models.BM25Config) BucketOption

func WithBitmapBufPool added in v1.29.4

func WithBitmapBufPool(bufPool roaringset.BitmapBufPool) BucketOption

func WithCalcCountNetAdditions added in v1.23.0

func WithCalcCountNetAdditions(calcCountNetAdditions bool) BucketOption

func WithDirtyThreshold added in v1.24.2

func WithDirtyThreshold(threshold time.Duration) BucketOption

func WithDisableCompaction added in v1.30.0

func WithDisableCompaction(disable bool) BucketOption

func WithDynamicMemtableSizing

func WithDynamicMemtableSizing(
	initialMB, maxMB, minActiveSeconds, maxActiveSeconds int,
) BucketOption

func WithForceCompaction added in v1.26.14

func WithForceCompaction(opt bool) BucketOption

Background for this option:

We use the LSM store in two places: Our existing key/value and inverted buckets As part of the new brute-force based index (to be built this week).

Brute-force index This is a simple disk-index where we use a cursor to iterate over all objects. This is what we need the force-compaction for. The experimentation so far has shown that the cursor is much more performant on a single segment than it is on multiple segments. This is because with a single segment it’s essentially just one conitiguuous chunk of data on disk that we read through. But with multiple segments (and an unpredicatable order) it ends up being many tiny reads (inefficient). Existing uses of the LSM store For existing uses, e.g. the object store, we don’t want to force-compact. This is because they can grow massive. For example, you could have a 100GB segment, then a new write leads to a new segment that is just a few bytes. If we would force-compact those two we would write 100GB every time the user sends a few bytes to Weaviate. In this case, the existing tiered compaction strategy makes more sense. Configurability of buckets

func WithKeepLevelCompaction added in v1.30.11

func WithKeepLevelCompaction(keepLevelCompaction bool) BucketOption

func WithKeepSegmentsInMemory added in v1.29.4

func WithKeepSegmentsInMemory(keep bool) BucketOption

func WithKeepTombstones added in v1.23.0

func WithKeepTombstones(keepTombstones bool) BucketOption

func WithLazySegmentLoading added in v1.31.4

func WithLazySegmentLoading(lazyLoading bool) BucketOption

WithLazySegmentLoading enables that segments are only initialized when they are actually used

This option should be used:

  • For buckets that are NOT used in every request. For example, the object bucket is accessed for almost all operations anyway.
  • For implicit request only (== requests originating with auto-tenant activation). Explicit activation should always load all segments.

func WithLegacyMapSorting

func WithLegacyMapSorting() BucketOption

func WithMaxSegmentSize added in v1.24.12

func WithMaxSegmentSize(maxSegmentSize int64) BucketOption

func WithMemtableThreshold

func WithMemtableThreshold(threshold uint64) BucketOption

func WithMinMMapSize added in v1.27.21

func WithMinMMapSize(minMMapSize int64) BucketOption

func WithMinWalThreshold added in v1.29.8

func WithMinWalThreshold(threshold int64) BucketOption

func WithMonitorCount

func WithMonitorCount() BucketOption

func WithPread added in v1.21.0

func WithPread(with bool) BucketOption

func WithSecondaryIndices

func WithSecondaryIndices(count uint16) BucketOption

func WithSegmentsChecksumValidationEnabled added in v1.26.14

func WithSegmentsChecksumValidationEnabled(enable bool) BucketOption

func WithSegmentsCleanupInterval added in v1.25.21

func WithSegmentsCleanupInterval(interval time.Duration) BucketOption

func WithStrategy

func WithStrategy(strategy string) BucketOption

func WithUseBloomFilter added in v1.23.0

func WithUseBloomFilter(useBloomFilter bool) BucketOption

func WithWalThreshold

func WithWalThreshold(threshold uint64) BucketOption

func WithWriteMetadata added in v1.31.7

func WithWriteMetadata(writeMetadata bool) BucketOption

WithWriteMetadata enables writing all metadata (primary+secondary bloom+ cna) in a single file instead of separate files

func WithWriteSegmentInfoIntoFileName added in v1.31.7

func WithWriteSegmentInfoIntoFileName(writeSegmentInfoIntoFileName bool) BucketOption

type BucketSlowLogEntries added in v1.31.18

type BucketSlowLogEntries []BucketSlowLogEntry

func (BucketSlowLogEntries) Reduce added in v1.31.18

type BucketSlowLogEntry added in v1.31.18

type BucketSlowLogEntry struct {
	Total            time.Duration
	View             time.Duration
	ActiveMemtable   time.Duration
	FlushingMemtable time.Duration
	Segments         time.Duration
	Recheck          time.Duration // only for secondary index reads
}

type BucketSlowLogEntryStats added in v1.31.18

type BucketSlowLogEntryStats struct {
	Total            DurationStats `json:"total"`
	View             DurationStats `json:"view"`
	ActiveMemtable   DurationStats `json:"activeMemtable"`
	FlushingMemtable DurationStats `json:"flushingMemtable"`
	Segments         DurationStats `json:"segments"`
	Recheck          DurationStats `json:"recheck"`
}

type BytesReadObserver added in v1.30.8

type BytesReadObserver func(bytes int64, nanoseconds int64)

type BytesWriteObserver added in v1.30.8

type BytesWriteObserver func(bytes int64)

type CommitType

type CommitType uint8
const (
	CommitTypeReplace CommitType = iota // replace strategy

	// collection strategy - this can handle all cases as updates and deletes are
	// only appends in a collection strategy
	CommitTypeCollection
	CommitTypeRoaringSet
	// new version of roaringset that stores data as a list of uint64 values,
	// instead of a roaring bitmap
	CommitTypeRoaringSetList
)

func (CommitType) Is added in v1.18.0

func (ct CommitType) Is(checkedCommitType CommitType) bool

func (CommitType) String added in v1.18.0

func (ct CommitType) String() string

type CursorMap

type CursorMap struct {
	// contains filtered or unexported fields
}

func (*CursorMap) Close

func (c *CursorMap) Close()

func (*CursorMap) First

func (c *CursorMap) First(ctx context.Context) ([]byte, []MapPair)

func (*CursorMap) Next

func (c *CursorMap) Next(ctx context.Context) ([]byte, []MapPair)

func (*CursorMap) Seek

func (c *CursorMap) Seek(ctx context.Context, key []byte) ([]byte, []MapPair)

type CursorReplace

type CursorReplace struct {
	// contains filtered or unexported fields
}

func (*CursorReplace) Close

func (c *CursorReplace) Close()

func (*CursorReplace) First

func (c *CursorReplace) First() ([]byte, []byte)

func (*CursorReplace) Next

func (c *CursorReplace) Next() ([]byte, []byte)

func (*CursorReplace) Seek

func (c *CursorReplace) Seek(key []byte) ([]byte, []byte)

type CursorRoaringSet added in v1.18.0

type CursorRoaringSet interface {
	First() ([]byte, *sroar.Bitmap)
	Next() ([]byte, *sroar.Bitmap)
	Seek([]byte) ([]byte, *sroar.Bitmap)
	Close()
}

type CursorSet

type CursorSet struct {
	// contains filtered or unexported fields
}

func (*CursorSet) Close

func (c *CursorSet) Close()

func (*CursorSet) First

func (c *CursorSet) First() ([]byte, [][]byte)

func (*CursorSet) Next

func (c *CursorSet) Next() ([]byte, [][]byte)

func (*CursorSet) Seek

func (c *CursorSet) Seek(key []byte) ([]byte, [][]byte)

type DurationStats added in v1.31.18

type DurationStats struct {
	Mean time.Duration `json:"mean"`
	Min  time.Duration `json:"min"`
	Max  time.Duration `json:"max"`
	P50  time.Duration `json:"p50"`
	P90  time.Duration `json:"p90"`
	P95  time.Duration `json:"p95"`
	P99  time.Duration `json:"p99"`
}

type MapListOption

type MapListOption func(c *MapListOptionConfig)

func MapListAcceptDuplicates

func MapListAcceptDuplicates() MapListOption

func MapListLegacySortingRequired

func MapListLegacySortingRequired() MapListOption

type MapListOptionConfig

type MapListOptionConfig struct {
	// contains filtered or unexported fields
}

type MapPair

type MapPair struct {
	Key       []byte
	Value     []byte
	Tombstone bool
}

func (MapPair) Bytes

func (kv MapPair) Bytes() ([]byte, error)

func (*MapPair) BytesInverted added in v1.28.0

func (kv *MapPair) BytesInverted() ([]byte, error)

func (MapPair) EncodeBytes

func (kv MapPair) EncodeBytes(buf []byte) error

func (MapPair) EncodeBytesInverted added in v1.28.0

func (kv MapPair) EncodeBytesInverted(buf []byte) error

func (*MapPair) FromBytes

func (kv *MapPair) FromBytes(in []byte, keyOnly bool) error

func (*MapPair) FromBytesInverted added in v1.28.0

func (kv *MapPair) FromBytesInverted(in []byte, keyOnly bool) error

func (*MapPair) FromBytesReusable

func (kv *MapPair) FromBytesReusable(in []byte, keyOnly bool) error

func (MapPair) Size

func (kv MapPair) Size() int

Size() returns the exact size in bytes that will be used when Bytes() is called

type Memtable

type Memtable struct {
	sync.RWMutex
	// contains filtered or unexported fields
}

func (*Memtable) ActiveDuration

func (m *Memtable) ActiveDuration() time.Duration

func (*Memtable) DirtyDuration added in v1.24.2

func (m *Memtable) DirtyDuration() time.Duration

returns time memtable got dirty (1st write occurred) (0 if clean)

func (*Memtable) GetPropLengths added in v1.28.14

func (m *Memtable) GetPropLengths() (uint64, uint64, error)

func (*Memtable) Path added in v1.31.18

func (m *Memtable) Path() string

func (*Memtable) ReadOnlyTombstones added in v1.28.12

func (m *Memtable) ReadOnlyTombstones() (*sroar.Bitmap, error)

func (*Memtable) SetTombstone added in v1.28.11

func (m *Memtable) SetTombstone(docId uint64) error

func (*Memtable) Size

func (m *Memtable) Size() uint64

type Metrics

type Metrics struct {

	// old-style metrics, to be migrated
	ActiveSegments               *prometheus.GaugeVec
	ObjectsBucketSegments        *prometheus.GaugeVec
	CompressedVecsBucketSegments *prometheus.GaugeVec
	SegmentObjects               *prometheus.GaugeVec
	SegmentSize                  *prometheus.GaugeVec
	SegmentCount                 *prometheus.GaugeVec
	SegmentUnloaded              *prometheus.GaugeVec

	DimensionSum      *prometheus.GaugeVec
	IOWrite           *prometheus.SummaryVec
	IORead            *prometheus.SummaryVec
	LazySegmentUnLoad prometheus.Gauge
	LazySegmentLoad   prometheus.Gauge
	LazySegmentClose  prometheus.Gauge
	LazySegmentInit   prometheus.Gauge
	// contains filtered or unexported fields
}

func NewMetrics

func NewMetrics(promMetrics *monitoring.PrometheusMetrics, className,
	shardName string,
) (*Metrics, error)

func (*Metrics) DecBucketInitInProgressByStrategy added in v1.31.17

func (m *Metrics) DecBucketInitInProgressByStrategy(strategy string)

func (*Metrics) DecBucketOpenCursorsByStrategy added in v1.31.17

func (m *Metrics) DecBucketOpenCursorsByStrategy(strategy string)

func (*Metrics) DecBucketReadOpOngoingByComponent added in v1.31.17

func (m *Metrics) DecBucketReadOpOngoingByComponent(op, component string)

func (*Metrics) DecBucketShutdownInProgressByStrategy added in v1.31.17

func (m *Metrics) DecBucketShutdownInProgressByStrategy(strategy string)

func (*Metrics) DecBucketWriteOpOngoing added in v1.31.17

func (m *Metrics) DecBucketWriteOpOngoing(op string)

func (*Metrics) DecCompactionInProgress added in v1.31.17

func (m *Metrics) DecCompactionInProgress(strategy string)

func (*Metrics) DecSegmentTotalByStrategy added in v1.31.17

func (m *Metrics) DecSegmentTotalByStrategy(strategy string)

func (*Metrics) DecWalRecoveryInProgress added in v1.31.17

func (m *Metrics) DecWalRecoveryInProgress(strategy string)

func (*Metrics) IncBucketInitCountByStrategy added in v1.31.17

func (m *Metrics) IncBucketInitCountByStrategy(strategy string)

bucket metrics

func (*Metrics) IncBucketInitFailureCountByStrategy added in v1.31.17

func (m *Metrics) IncBucketInitFailureCountByStrategy(strategy string)

func (*Metrics) IncBucketInitInProgressByStrategy added in v1.31.17

func (m *Metrics) IncBucketInitInProgressByStrategy(strategy string)

func (*Metrics) IncBucketOpenCursorsByStrategy added in v1.31.17

func (m *Metrics) IncBucketOpenCursorsByStrategy(strategy string)

func (*Metrics) IncBucketOpenedCursorsByStrategy added in v1.31.17

func (m *Metrics) IncBucketOpenedCursorsByStrategy(strategy string)

func (*Metrics) IncBucketReadOpCountByComponent added in v1.31.17

func (m *Metrics) IncBucketReadOpCountByComponent(op, component string)

func (*Metrics) IncBucketReadOpFailureCountByComponent added in v1.31.17

func (m *Metrics) IncBucketReadOpFailureCountByComponent(op, component string)

func (*Metrics) IncBucketReadOpOngoingByComponent added in v1.31.17

func (m *Metrics) IncBucketReadOpOngoingByComponent(op, component string)

func (*Metrics) IncBucketShutdownCountByStrategy added in v1.31.17

func (m *Metrics) IncBucketShutdownCountByStrategy(strategy string)

func (*Metrics) IncBucketShutdownFailureCountByStrategy added in v1.31.17

func (m *Metrics) IncBucketShutdownFailureCountByStrategy(strategy string)

func (*Metrics) IncBucketShutdownInProgressByStrategy added in v1.31.17

func (m *Metrics) IncBucketShutdownInProgressByStrategy(strategy string)

func (*Metrics) IncBucketWriteOpCount added in v1.31.17

func (m *Metrics) IncBucketWriteOpCount(op string)

func (*Metrics) IncBucketWriteOpFailureCount added in v1.31.17

func (m *Metrics) IncBucketWriteOpFailureCount(op string)

func (*Metrics) IncBucketWriteOpOngoing added in v1.31.17

func (m *Metrics) IncBucketWriteOpOngoing(op string)

func (*Metrics) IncCompactionCount added in v1.31.17

func (m *Metrics) IncCompactionCount(strategy string)

compaction metrics

func (*Metrics) IncCompactionFailureCount added in v1.31.17

func (m *Metrics) IncCompactionFailureCount(strategy string)

func (*Metrics) IncCompactionInProgress added in v1.31.17

func (m *Metrics) IncCompactionInProgress(strategy string)

func (*Metrics) IncCompactionNoOp added in v1.31.17

func (m *Metrics) IncCompactionNoOp(strategy string)

func (*Metrics) IncSegmentTotalByStrategy added in v1.31.17

func (m *Metrics) IncSegmentTotalByStrategy(strategy string)

func (*Metrics) IncWalRecoveryCount added in v1.31.17

func (m *Metrics) IncWalRecoveryCount(strategy string)

wal recovery metrics

func (*Metrics) IncWalRecoveryFailureCount added in v1.31.17

func (m *Metrics) IncWalRecoveryFailureCount(strategy string)

func (*Metrics) IncWalRecoveryInProgress added in v1.31.17

func (m *Metrics) IncWalRecoveryInProgress(strategy string)

func (*Metrics) MemtableOpObserver

func (m *Metrics) MemtableOpObserver(path, strategy, op string) NsObserver

func (*Metrics) MemtableSizeSetter

func (m *Metrics) MemtableSizeSetter(path, strategy string) Setter

func (*Metrics) MemtableWriteObserver added in v1.27.23

func (m *Metrics) MemtableWriteObserver(strategy, op string) BytesWriteObserver

func (*Metrics) ObjectCount

func (m *Metrics) ObjectCount(count int)

func (*Metrics) ObserveBucketCursorDurationByStrategy added in v1.31.17

func (m *Metrics) ObserveBucketCursorDurationByStrategy(strategy string, duration time.Duration)

func (*Metrics) ObserveBucketInitDurationByStrategy added in v1.31.17

func (m *Metrics) ObserveBucketInitDurationByStrategy(strategy string, duration time.Duration)

func (*Metrics) ObserveBucketReadOpDurationByComponent added in v1.31.17

func (m *Metrics) ObserveBucketReadOpDurationByComponent(op, component string, duration time.Duration)

func (*Metrics) ObserveBucketShutdownDurationByStrategy added in v1.31.17

func (m *Metrics) ObserveBucketShutdownDurationByStrategy(strategy string, duration time.Duration)

func (*Metrics) ObserveBucketWriteOpDuration added in v1.31.17

func (m *Metrics) ObserveBucketWriteOpDuration(op string, duration time.Duration)

func (*Metrics) ObserveCompactionDuration added in v1.31.17

func (m *Metrics) ObserveCompactionDuration(strategy string, duration time.Duration)

func (*Metrics) ObserveSegmentSize added in v1.31.17

func (m *Metrics) ObserveSegmentSize(strategy string, sizeBytes int64)

func (*Metrics) ObserveWalRecoveryDuration added in v1.31.17

func (m *Metrics) ObserveWalRecoveryDuration(strategy string, duration time.Duration)

func (*Metrics) ReadObserver added in v1.30.8

func (m *Metrics) ReadObserver(op string) BytesReadObserver

func (*Metrics) TrackStartupBucket

func (m *Metrics) TrackStartupBucket(start time.Time)

func (*Metrics) TrackStartupReadWALDiskIO

func (m *Metrics) TrackStartupReadWALDiskIO(read int64, nanoseconds int64)

type MockBucketCreator added in v1.25.0

type MockBucketCreator struct {
	mock.Mock
}

MockBucketCreator is an autogenerated mock type for the BucketCreator type

func NewMockBucketCreator added in v1.25.0

func NewMockBucketCreator(t interface {
	mock.TestingT
	Cleanup(func())
}) *MockBucketCreator

NewMockBucketCreator creates a new instance of MockBucketCreator. It also registers a testing interface on the mock and a cleanup function to assert the mocks expectations. The first argument is typically a *testing.T value.

func (*MockBucketCreator) EXPECT added in v1.27.18

func (*MockBucketCreator) NewBucket added in v1.25.0

func (_m *MockBucketCreator) NewBucket(ctx context.Context, dir string, rootDir string, logger logrus.FieldLogger, metrics *Metrics, compactionCallbacks cyclemanager.CycleCallbackGroup, flushCallbacks cyclemanager.CycleCallbackGroup, opts ...BucketOption) (*Bucket, error)

NewBucket provides a mock function with given fields: ctx, dir, rootDir, logger, metrics, compactionCallbacks, flushCallbacks, opts

type MockBucketCreator_Expecter added in v1.27.18

type MockBucketCreator_Expecter struct {
	// contains filtered or unexported fields
}

func (*MockBucketCreator_Expecter) NewBucket added in v1.27.18

func (_e *MockBucketCreator_Expecter) NewBucket(ctx interface{}, dir interface{}, rootDir interface{}, logger interface{}, metrics interface{}, compactionCallbacks interface{}, flushCallbacks interface{}, opts ...interface{}) *MockBucketCreator_NewBucket_Call

NewBucket is a helper method to define mock.On call

  • ctx context.Context
  • dir string
  • rootDir string
  • logger logrus.FieldLogger
  • metrics *Metrics
  • compactionCallbacks cyclemanager.CycleCallbackGroup
  • flushCallbacks cyclemanager.CycleCallbackGroup
  • opts ...BucketOption

type MockBucketCreator_NewBucket_Call added in v1.27.18

type MockBucketCreator_NewBucket_Call struct {
	*mock.Call
}

MockBucketCreator_NewBucket_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'NewBucket'

func (*MockBucketCreator_NewBucket_Call) Return added in v1.27.18

func (*MockBucketCreator_NewBucket_Call) Run added in v1.27.18

func (_c *MockBucketCreator_NewBucket_Call) Run(run func(ctx context.Context, dir string, rootDir string, logger logrus.FieldLogger, metrics *Metrics, compactionCallbacks cyclemanager.CycleCallbackGroup, flushCallbacks cyclemanager.CycleCallbackGroup, opts ...BucketOption)) *MockBucketCreator_NewBucket_Call

type NsObserver

type NsObserver func(ns int64)

type ReaderRoaringSetRange added in v1.26.5

type ReaderRoaringSetRange interface {
	Read(ctx context.Context, value uint64, operator filters.Operator) (result *sroar.Bitmap, release func(), err error)
	Close()
}

type SecondaryKeyOption

type SecondaryKeyOption func(s secondaryIndexKeys) error

func WithSecondaryKey

func WithSecondaryKey(pos int, key []byte) SecondaryKeyOption

type Segment added in v1.31.4

type Segment interface {
	Size() int64

	MergeTombstones(other *sroar.Bitmap) (*sroar.Bitmap, error)

	ReadOnlyTombstones() (*sroar.Bitmap, error)
	// contains filtered or unexported methods
}

type SegmentBlockMax added in v1.28.0

type SegmentBlockMax struct {
	Metrics BlockMetrics
	// contains filtered or unexported fields
}

func NewSegmentBlockMax added in v1.28.0

func NewSegmentBlockMax(s *segment, key []byte, queryTermIndex int, idf float64, propertyBoost float32, tombstones *sroar.Bitmap, filterDocIds helpers.AllowList, averagePropLength float64, config schema.BM25Config) *SegmentBlockMax

func NewSegmentBlockMaxDecoded added in v1.28.5

func NewSegmentBlockMaxDecoded(key []byte, queryTermIndex int, propertyBoost float32, filterDocIds helpers.AllowList, averagePropLength float64, config schema.BM25Config) *SegmentBlockMax

func NewSegmentBlockMaxTest added in v1.28.0

func NewSegmentBlockMaxTest(docCount uint64, blockEntries []*terms.BlockEntry, blockDatas []*terms.BlockData, propLengths map[uint64]uint32, key []byte, queryTermIndex int, idf float64, propertyBoost float32, tombstones *sroar.Bitmap, filterDocIds helpers.AllowList, averagePropLength float64, config schema.BM25Config, codecs []varenc.VarEncDataType) *SegmentBlockMax

func (*SegmentBlockMax) Advance added in v1.28.0

func (s *SegmentBlockMax) Advance()

func (*SegmentBlockMax) AdvanceAtLeast added in v1.28.0

func (s *SegmentBlockMax) AdvanceAtLeast(docId uint64)

func (*SegmentBlockMax) AdvanceAtLeastShallow added in v1.28.0

func (s *SegmentBlockMax) AdvanceAtLeastShallow(docId uint64)

func (*SegmentBlockMax) Count added in v1.28.0

func (s *SegmentBlockMax) Count() int

func (*SegmentBlockMax) CurrentBlockImpact added in v1.28.0

func (s *SegmentBlockMax) CurrentBlockImpact() float32

func (*SegmentBlockMax) CurrentBlockMaxId added in v1.28.0

func (s *SegmentBlockMax) CurrentBlockMaxId() uint64

func (*SegmentBlockMax) Exhausted added in v1.28.0

func (s *SegmentBlockMax) Exhausted() bool

func (*SegmentBlockMax) IdPointer added in v1.28.0

func (s *SegmentBlockMax) IdPointer() uint64

func (*SegmentBlockMax) Idf added in v1.28.0

func (s *SegmentBlockMax) Idf() float64

func (*SegmentBlockMax) QueryTerm added in v1.28.5

func (s *SegmentBlockMax) QueryTerm() string

func (*SegmentBlockMax) QueryTermIndex added in v1.28.0

func (s *SegmentBlockMax) QueryTermIndex() int

func (*SegmentBlockMax) Score added in v1.28.0

func (s *SegmentBlockMax) Score(averagePropLength float64, additionalExplanation bool) (uint64, float64, *terms.DocPointerWithScore)

func (*SegmentBlockMax) SetIdf added in v1.28.11

func (s *SegmentBlockMax) SetIdf(idf float64)

type SegmentGroup

type SegmentGroup struct {
	MinMMapSize int64
	// contains filtered or unexported fields
}

func (*SegmentGroup) GetAveragePropertyLength added in v1.28.14

func (sg *SegmentGroup) GetAveragePropertyLength() (float64, uint64)

func (*SegmentGroup) Len

func (sg *SegmentGroup) Len() int

func (*SegmentGroup) UpdateStatus added in v1.23.0

func (sg *SegmentGroup) UpdateStatus(status storagestate.Status)

type Setter

type Setter func(val uint64)

type Store

type Store struct {
	// contains filtered or unexported fields
}

Store groups multiple buckets together, it "owns" one folder on the file system

func New

func New(dir, rootDir string, logger logrus.FieldLogger, metrics *Metrics,
	shardCompactionCallbacks, shardCompactionAuxCallbacks,
	shardFlushCallbacks cyclemanager.CycleCallbackGroup,
) (*Store, error)

New initializes a new Store based on the root dir. If state is present on disk, it is loaded, if the folder is empty a new store is initialized in there.

func (*Store) Bucket

func (s *Store) Bucket(name string) *Bucket

func (*Store) CreateBucket added in v1.18.0

func (s *Store) CreateBucket(ctx context.Context, bucketName string,
	opts ...BucketOption,
) error

Creates bucket, first removing any files if already exist Bucket can not be registered in bucketsByName before removal

func (*Store) CreateOrLoadBucket

func (s *Store) CreateOrLoadBucket(ctx context.Context, bucketName string,
	opts ...BucketOption,
) (err error)

CreateOrLoadBucket registers a bucket with the given name. If state on disk exists for this bucket it is loaded, otherwise created. Pass [BucketOptions] to configure the strategy of a bucket. The strategy defaults to "replace". For example, to load or create a map-type bucket, do:

ctx := context.Background()
err := store.CreateOrLoadBucket(ctx, "my_bucket_name", WithStrategy(StrategyReplace))
if err != nil { /* handle error */ }

// you can now access the bucket using store.Bucket()
b := store.Bucket("my_bucket_name")

func (*Store) FlushMemtables

func (s *Store) FlushMemtables(ctx context.Context) error

FlushMemtable flushes any active memtable and returns only once the memtable has been fully flushed and a stable state on disk has been reached.

This is a preparatory stage for creating backups.

A timeout should be specified for the input context as some flushes are long-running, in which case it may be better to fail the backup attempt and retry later, than to block indefinitely.

func (*Store) GetBucketsByName added in v1.18.0

func (s *Store) GetBucketsByName() map[string]*Bucket

func (*Store) GetDir added in v1.30.19

func (s *Store) GetDir() string

func (*Store) ListFiles

func (s *Store) ListFiles(ctx context.Context, basePath string) ([]string, error)

func (*Store) PauseCompaction

func (s *Store) PauseCompaction(ctx context.Context) error

PauseCompaction waits for all ongoing compactions to finish, then makes sure that no new compaction can be started.

This is a preparatory stage for creating backups.

A timeout should be specified for the input context as some compactions are long-running, in which case it may be better to fail the backup attempt and retry later, than to block indefinitely.

func (*Store) PauseObjectBucketCompaction added in v1.30.11

func (s *Store) PauseObjectBucketCompaction(ctx context.Context) error

PauseObjectBucketCompaction pauses the compaction cycle for the objects bucket. This is so that the BMW migration can run without interference from the compaction process, as they both use the same locks.

func (*Store) RenameBucket added in v1.19.0

func (s *Store) RenameBucket(ctx context.Context, bucketName, newBucketName string) error

func (*Store) ReplaceBuckets added in v1.18.0

func (s *Store) ReplaceBuckets(ctx context.Context, bucketName, replacementBucketName string) error

Replaces 1st bucket with 2nd one. Both buckets have to registered in bucketsByName. 2nd bucket swaps the 1st one in bucketsByName using 1st one's name, 2nd one's name is deleted. Dir path of 2nd bucket is changed to dir of 1st bucket as well as all other related paths of bucket resources (segment group, memtables, commit log). Dir path of 1st bucket is temporarily suffixed with "___del", later on bucket is shutdown and its files deleted. 2nd bucket becomes 1st bucket

func (*Store) ResumeCompaction

func (s *Store) ResumeCompaction(ctx context.Context) error

ResumeCompaction starts the compaction cycle again. It errors if compactions were not paused

func (*Store) ResumeObjectBucketCompaction added in v1.30.11

func (s *Store) ResumeObjectBucketCompaction(ctx context.Context) error

ResumeObjectBucketCompaction resumes the compaction cycle for the objects bucket.

func (*Store) Shutdown

func (s *Store) Shutdown(ctx context.Context) error

func (*Store) ShutdownBucket added in v1.29.11

func (s *Store) ShutdownBucket(ctx context.Context, bucketName string) error

func (*Store) UpdateBucketsStatus

func (s *Store) UpdateBucketsStatus(targetStatus storagestate.Status) error

func (*Store) WriteWALs

func (s *Store) WriteWALs() error

type Terms added in v1.28.5

type Terms []*SegmentBlockMax

func (Terms) Len added in v1.28.5

func (t Terms) Len() int

provide sort interface for

func (Terms) Less added in v1.28.5

func (t Terms) Less(i, j int) bool

func (Terms) Swap added in v1.28.5

func (t Terms) Swap(i, j int)

type TermsBySize added in v1.30.17

type TermsBySize []*SegmentBlockMax

func (TermsBySize) Len added in v1.30.17

func (t TermsBySize) Len() int

provide sort interface for

func (TermsBySize) Less added in v1.30.17

func (t TermsBySize) Less(i, j int) bool

func (TermsBySize) Swap added in v1.30.17

func (t TermsBySize) Swap(i, j int)

type TimeObserver

type TimeObserver func(start time.Time)

Source Files

Directories

Path Synopsis
ent contains common types used throughout various lsmkv (sub-)packages
ent contains common types used throughout various lsmkv (sub-)packages

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL