Documentation
¶
Index ¶
- Constants
- Variables
- func IsClosed(err error) bool
- func IsCorrupted(err error) bool
- func IsError(err error, code ErrCode) bool
- func IsNotFound(err error) bool
- func WrapError(err error, format string, args ...any) error
- type BTreeBuilder
- type BTreeNode
- type BTreeReader
- type CompactionManager
- func (m *CompactionManager) ApplyConfig(opts *Options)
- func (m *CompactionManager) CleanupOrphanFiles()
- func (m *CompactionManager) DoCompaction(task *CompactionTask) error
- func (m *CompactionManager) DoCompactionWithVersion(task *CompactionTask, version *Version) error
- func (m *CompactionManager) GetLevelSizeLimit(level int) int64
- func (m *CompactionManager) GetLevelStats() []LevelStats
- func (m *CompactionManager) GetPicker() *Picker
- func (m *CompactionManager) GetStats() *CompactionStats
- func (m *CompactionManager) MaybeCompact()
- func (m *CompactionManager) SetSchema(schema *Schema)
- func (m *CompactionManager) Start()
- func (m *CompactionManager) Stop()
- func (m *CompactionManager) TriggerCompaction() error
- type CompactionStats
- type CompactionTask
- type Compactor
- type Database
- func (db *Database) Clean() error
- func (db *Database) CleanTable(name string) error
- func (db *Database) Close() error
- func (db *Database) CreateTable(name string, schema *Schema) (*Table, error)
- func (db *Database) Destroy() error
- func (db *Database) DestroyTable(name string) error
- func (db *Database) DropTable(name string) error
- func (db *Database) GetAllTablesInfo() map[string]*Table
- func (db *Database) GetTable(name string) (*Table, error)
- func (db *Database) ListTables() []string
- type EditType
- type ErrCode
- type Error
- type Expr
- func And(exprs ...Expr) Expr
- func Between(field string, min, max any) Expr
- func Contains(field string, pattern string) Expr
- func EndsWith(field string, suffix string) Expr
- func Eq(field string, value any) Expr
- func Gt(field string, value any) Expr
- func Gte(field string, value any) Expr
- func In(field string, values []any) Expr
- func IsNull(field string) Expr
- func Lt(field string, value any) Expr
- func Lte(field string, value any) Expr
- func Not(expr Expr) Expr
- func NotBetween(field string, min, max any) Expr
- func NotContains(field string, pattern string) Expr
- func NotEndsWith(field string, suffix string) Expr
- func NotEq(field string, value any) Expr
- func NotIn(field string, values []any) Expr
- func NotNull(field string) Expr
- func NotStartsWith(field string, prefix string) Expr
- func Or(exprs ...Expr) Expr
- func StartsWith(field string, prefix string) Expr
- type Field
- type FieldType
- type Fieldset
- type FileMetadata
- type ImmutableMemTable
- type IndexBTreeReader
- type IndexBTreeWriter
- type IndexEntryCallback
- type IndexHeader
- type IndexManager
- func (m *IndexManager) AddToIndexes(data map[string]any, seq int64) error
- func (m *IndexManager) BuildAll() error
- func (m *IndexManager) Close() error
- func (m *IndexManager) CreateIndex(field string) error
- func (m *IndexManager) DropIndex(field string) error
- func (m *IndexManager) GetIndex(field string) (*SecondaryIndex, bool)
- func (m *IndexManager) GetIndexMetadata() map[string]IndexMetadata
- func (m *IndexManager) ListIndexes() []string
- func (m *IndexManager) VerifyAndRepair(currentMaxSeq int64, getData func(int64) (map[string]any, error)) error
- type IndexMetadata
- type KeyCallback
- type LevelStats
- type ManifestReader
- type ManifestWriter
- type MemTable
- type MemTableIterator
- type MemTableManager
- func (m *MemTableManager) Clear()
- func (m *MemTableManager) Get(key int64) ([]byte, bool)
- func (m *MemTableManager) GetActive() *MemTable
- func (m *MemTableManager) GetActiveCount() int
- func (m *MemTableManager) GetActiveSize() int64
- func (m *MemTableManager) GetImmutableCount() int
- func (m *MemTableManager) GetImmutables() []*ImmutableMemTable
- func (m *MemTableManager) GetStats() *MemTableStats
- func (m *MemTableManager) NewIterator() *MemTableIterator
- func (m *MemTableManager) Put(key int64, value []byte)
- func (m *MemTableManager) RemoveImmutable(target *ImmutableMemTable)
- func (m *MemTableManager) SetActiveWAL(walNumber int64)
- func (m *MemTableManager) ShouldSwitch() bool
- func (m *MemTableManager) Switch(newWALNumber int64) (oldWALNumber int64, immutable *ImmutableMemTable)
- func (m *MemTableManager) TotalCount() int
- func (m *MemTableManager) TotalSize() int64
- type MemTableStats
- type Metadata
- type Neginative
- type Options
- type Picker
- type QueryBuilder
- func (qb *QueryBuilder) Between(field string, start, end any) *QueryBuilder
- func (qb *QueryBuilder) Contains(field string, pattern string) *QueryBuilder
- func (qb *QueryBuilder) EndsWith(field string, pattern string) *QueryBuilder
- func (qb *QueryBuilder) Eq(field string, value any) *QueryBuilder
- func (qb *QueryBuilder) First() (*Row, error)
- func (qb *QueryBuilder) Gt(field string, value any) *QueryBuilder
- func (qb *QueryBuilder) Gte(field string, value any) *QueryBuilder
- func (qb *QueryBuilder) In(field string, values []any) *QueryBuilder
- func (qb *QueryBuilder) IsNull(field string) *QueryBuilder
- func (qb *QueryBuilder) Last() (*Row, error)
- func (qb *QueryBuilder) Limit(n int) *QueryBuilder
- func (qb *QueryBuilder) Lt(field string, value any) *QueryBuilder
- func (qb *QueryBuilder) Lte(field string, value any) *QueryBuilder
- func (qb *QueryBuilder) Match(data map[string]any) bool
- func (qb *QueryBuilder) NotBetween(field string, start, end any) *QueryBuilder
- func (qb *QueryBuilder) NotContains(field string, pattern string) *QueryBuilder
- func (qb *QueryBuilder) NotEndsWith(field string, pattern string) *QueryBuilder
- func (qb *QueryBuilder) NotEq(field string, value any) *QueryBuilder
- func (qb *QueryBuilder) NotIn(field string, values []any) *QueryBuilder
- func (qb *QueryBuilder) NotNull(field string) *QueryBuilder
- func (qb *QueryBuilder) NotStartsWith(field string, pattern string) *QueryBuilder
- func (qb *QueryBuilder) Offset(n int) *QueryBuilder
- func (qb *QueryBuilder) OrderBy(field string) *QueryBuilder
- func (qb *QueryBuilder) OrderByDesc(field string) *QueryBuilder
- func (qb *QueryBuilder) Paginate(page, pageSize int) (rows *Rows, total int, err error)
- func (qb *QueryBuilder) Rows() (*Rows, error)
- func (qb *QueryBuilder) Scan(value any) error
- func (qb *QueryBuilder) Select(fields ...string) *QueryBuilder
- func (qb *QueryBuilder) StartsWith(field string, pattern string) *QueryBuilder
- func (qb *QueryBuilder) Where(exprs ...Expr) *QueryBuilder
- type Row
- type Rows
- func (r *Rows) Close() error
- func (r *Rows) Collect() []map[string]any
- func (r *Rows) Count() int
- func (r *Rows) Data() []map[string]any
- func (r *Rows) Err() error
- func (r *Rows) First() (*Row, error)
- func (r *Rows) Last() (*Row, error)
- func (r *Rows) Len() int
- func (r *Rows) Next() bool
- func (r *Rows) Row() *Row
- func (r *Rows) Scan(value any) error
- type SSTableHeader
- type SSTableManager
- func (m *SSTableManager) AddReader(reader *SSTableReader)
- func (m *SSTableManager) Close() error
- func (m *SSTableManager) Count() int
- func (m *SSTableManager) CreateSST(fileNumber int64, rows []*SSTableRow) (*SSTableReader, error)
- func (m *SSTableManager) CreateSSTWithLevel(fileNumber int64, rows []*SSTableRow, level int) (*SSTableReader, error)
- func (m *SSTableManager) Get(seq int64) (*SSTableRow, error)
- func (m *SSTableManager) GetMaxSeq() int64
- func (m *SSTableManager) GetPartial(seq int64, fields []string) (*SSTableRow, error)
- func (m *SSTableManager) GetReaders() []*SSTableReader
- func (m *SSTableManager) GetStats() *SSTableStats
- func (m *SSTableManager) ListFiles() []string
- func (m *SSTableManager) RemoveReader(fileNumber int64) error
- func (m *SSTableManager) SetSchema(schema *Schema)
- type SSTableReader
- func (r *SSTableReader) Close() error
- func (r *SSTableReader) ForEach(callback KeyCallback)
- func (r *SSTableReader) ForEachDesc(callback KeyCallback)
- func (r *SSTableReader) Get(key int64) (*SSTableRow, error)
- func (r *SSTableReader) GetAllKeys() []int64
- func (r *SSTableReader) GetHeader() *SSTableHeader
- func (r *SSTableReader) GetPartial(key int64, fields []string) (*SSTableRow, error)
- func (r *SSTableReader) GetPath() string
- func (r *SSTableReader) SetSchema(schema *Schema)
- type SSTableRow
- type SSTableStats
- type SSTableWriter
- type Schema
- func (s *Schema) ComputeChecksum() (string, error)
- func (s *Schema) ExtractIndexValue(field string, data map[string]any) (any, error)
- func (s *Schema) GetField(name string) (*Field, error)
- func (s *Schema) GetIndexedFields() []Field
- func (s *Schema) Validate(data map[string]any) error
- func (s *Schema) ValidateType(typ FieldType, value any) error
- type SchemaFile
- type SecondaryIndex
- func (idx *SecondaryIndex) Add(value any, seq int64) error
- func (idx *SecondaryIndex) Build() error
- func (idx *SecondaryIndex) Close() error
- func (idx *SecondaryIndex) ForEach(callback IndexEntryCallback) error
- func (idx *SecondaryIndex) ForEachDesc(callback IndexEntryCallback) error
- func (idx *SecondaryIndex) Get(value any) ([]int64, error)
- func (idx *SecondaryIndex) GetMetadata() IndexMetadata
- func (idx *SecondaryIndex) IncrementalUpdate(getData func(int64) (map[string]any, error), fromSeq, toSeq int64) error
- func (idx *SecondaryIndex) IsReady() bool
- func (idx *SecondaryIndex) NeedsUpdate(currentMaxSeq int64) bool
- type Table
- func (t *Table) BuildIndexes() error
- func (t *Table) Clean() error
- func (t *Table) Close() error
- func (t *Table) CreateIndex(field string) error
- func (t *Table) Destroy() error
- func (t *Table) DropIndex(field string) error
- func (t *Table) Flush() error
- func (t *Table) Get(seq int64) (*SSTableRow, error)
- func (t *Table) GetCompactionManager() *CompactionManager
- func (t *Table) GetDir() string
- func (t *Table) GetIndex(field string) (*SecondaryIndex, bool)
- func (t *Table) GetIndexMetadata() map[string]IndexMetadata
- func (t *Table) GetMaxSeq() int64
- func (t *Table) GetMemtableManager() *MemTableManager
- func (t *Table) GetName() string
- func (t *Table) GetPartial(seq int64, fields []string) (*SSTableRow, error)
- func (t *Table) GetSSTManager() *SSTableManager
- func (t *Table) GetSchema() *Schema
- func (t *Table) GetVersionSet() *VersionSet
- func (t *Table) Insert(data any) error
- func (t *Table) ListIndexes() []string
- func (t *Table) Query() *QueryBuilder
- func (t *Table) RepairIndexes() error
- func (t *Table) SetLogger(logger *slog.Logger)
- func (t *Table) Stats() *TableStats
- type TableInfo
- type TableOptions
- type TableStats
- type Version
- func (v *Version) Apply(edit *VersionEdit)
- func (v *Version) Clone() *Version
- func (v *Version) GetFileCount() int
- func (v *Version) GetLastSequence() int64
- func (v *Version) GetLevel(level int) []*FileMetadata
- func (v *Version) GetLevelFileCount(level int) int
- func (v *Version) GetNextFileNumber() int64
- func (v *Version) GetSSTFiles() []*FileMetadata
- type VersionEdit
- type VersionSet
- func (vs *VersionSet) AllocateFileNumber() int64
- func (vs *VersionSet) Close() error
- func (vs *VersionSet) GetCurrent() *Version
- func (vs *VersionSet) GetLastSequence() int64
- func (vs *VersionSet) GetNextFileNumber() int64
- func (vs *VersionSet) LogAndApply(edit *VersionEdit) error
- func (vs *VersionSet) SetLastSequence(seq int64)
- type WAL
- type WALEntry
- type WALManager
- func (m *WALManager) Append(entry *WALEntry) error
- func (m *WALManager) Close() error
- func (m *WALManager) Delete(number int64) error
- func (m *WALManager) GetCurrentNumber() int64
- func (m *WALManager) ListWALFiles() ([]string, error)
- func (m *WALManager) RecoverAll() ([]*WALEntry, error)
- func (m *WALManager) Rotate() (int64, error)
- func (m *WALManager) Sync() error
- type WALReader
Constants ¶
const ( BTreeNodeSize = 4096 // 节点大小 (4 KB) BTreeOrder = 200 // B+Tree 阶数 (保守估计,叶子节点每个entry 20 bytes) BTreeHeaderSize = 32 // 节点头大小 BTreeNodeTypeInternal = 0 // 内部节点 BTreeNodeTypeLeaf = 1 // 叶子节点 )
const ( IndexHeaderSize = 256 // 索引文件头大小 IndexMagic = 0x49445842 // "IDXB" - Index B-Tree IndexVersion = 1 // 文件格式版本 )
const ( // 文件格式 SSTableMagicNumber = 0x53535433 // "SST3" SSTableVersion = 1 SSTableHeaderSize = 256 // 文件头大小 SSTableBlockSize = 64 * 1024 // 数据块大小 (64 KB) // 二进制编码格式: // [Magic: 4 bytes][Seq: 8 bytes][Time: 8 bytes][DataLen: 4 bytes][Data: variable] SSTableRowMagic = 0x524F5731 // "ROW1" )
const ( DefaultMemTableSize = 64 * 1024 * 1024 // 64 MB DefaultAutoFlushTimeout = 30 * time.Second // 30 秒无写入自动 flush )
const ( // Entry 类型 WALEntryTypePut = 1 WALEntryTypeDelete = 2 // 预留,暂不支持 // Entry Header 大小 WALEntryHeaderSize = 17 // CRC32(4) + Length(4) + Type(1) + Seq(8) )
const (
NumLevels = 4 // L0-L3
)
Variables ¶
var ( // ErrNotFound 数据未找到 ErrNotFound = NewError(ErrCodeNotFound, nil) // ErrClosed 对象已关闭 ErrClosed = NewError(ErrCodeClosed, nil) // ErrInvalidData 无效数据 ErrInvalidData = NewError(ErrCodeInvalidData, nil) // ErrCorrupted 数据损坏 ErrCorrupted = NewError(ErrCodeCorrupted, nil) )
预定义的常用错误(向后兼容)
var ( ErrDatabaseNotFound = NewError(ErrCodeDatabaseNotFound, nil) ErrDatabaseExists = NewError(ErrCodeDatabaseExists, nil) ErrDatabaseClosed = NewError(ErrCodeDatabaseClosed, nil) )
数据库错误(向后兼容)
var ( ErrTableNotFound = NewError(ErrCodeTableNotFound, nil) ErrTableExists = NewError(ErrCodeTableExists, nil) ErrTableClosed = NewError(ErrCodeTableClosed, nil) )
表错误(向后兼容)
var ( ErrSchemaNotFound = NewError(ErrCodeSchemaNotFound, nil) ErrSchemaInvalid = NewError(ErrCodeSchemaInvalid, nil) ErrSchemaMismatch = NewError(ErrCodeSchemaMismatch, nil) ErrSchemaValidationFailed = NewError(ErrCodeSchemaValidationFailed, nil) ErrSchemaChecksumMismatch = NewError(ErrCodeSchemaChecksumMismatch, nil) )
Schema 错误(向后兼容)
var ( ErrFieldNotFound = NewError(ErrCodeFieldNotFound, nil) ErrFieldTypeMismatch = NewError(ErrCodeFieldTypeMismatch, nil) ErrFieldRequired = NewError(ErrCodeFieldRequired, nil) )
字段错误(向后兼容)
var ( ErrIndexNotFound = NewError(ErrCodeIndexNotFound, nil) ErrIndexExists = NewError(ErrCodeIndexExists, nil) ErrIndexNotReady = NewError(ErrCodeIndexNotReady, nil) ErrIndexCorrupted = NewError(ErrCodeIndexCorrupted, nil) )
索引错误(向后兼容)
var ( ErrInvalidFormat = NewError(ErrCodeInvalidFormat, nil) ErrUnsupportedVersion = NewError(ErrCodeUnsupportedVersion, nil) ErrInvalidMagicNumber = NewError(ErrCodeInvalidMagicNumber, nil) ErrChecksumMismatch = NewError(ErrCodeChecksumMismatch, nil) )
文件格式错误(向后兼容)
var ( ErrWALCorrupted = NewError(ErrCodeWALCorrupted, nil) ErrWALClosed = NewError(ErrCodeWALClosed, nil) )
WAL 错误(向后兼容)
var ( ErrSSTableNotFound = NewError(ErrCodeSSTableNotFound, nil) ErrSSTableCorrupted = NewError(ErrCodeSSTableCorrupted, nil) )
SSTable 错误(向后兼容)
var ( ErrCompactionInProgress = NewError(ErrCodeCompactionInProgress, nil) ErrNoCompactionNeeded = NewError(ErrCodeNoCompactionNeeded, nil) )
Compaction 错误(向后兼容)
var ( ErrEncodeFailed = NewError(ErrCodeEncodeFailed, nil) ErrDecodeFailed = NewError(ErrCodeDecodeFailed, nil) )
编解码错误(向后兼容)
Functions ¶
Types ¶
type BTreeBuilder ¶
type BTreeBuilder struct {
// contains filtered or unexported fields
}
BTreeBuilder 从下往上构建 B+Tree
构建流程:
Add(): 添加所有 (key, offset, size) 到叶子节点 - 当叶子节点满时,创建新的叶子节点 - 所有叶子节点按 key 有序
Build(): 从叶子层向上构建 - Level 0: 叶子节点(已创建) - Level 1: 为叶子节点创建父节点(内部节点) - Level 2+: 递归创建更高层级 - 最终返回根节点偏移量
示例(100 个 key,Order=200):
- 叶子层: 1 个叶子节点(100 个 key)
- 根节点: 叶子节点本身
示例(500 个 key,Order=200):
- 叶子层: 3 个叶子节点(200, 200, 100 个 key)
- Level 1: 1 个内部节点(3 个子节点)
- 根节点: Level 1 的内部节点
func NewBTreeBuilder ¶
func NewBTreeBuilder(file *os.File, startOffset int64) *BTreeBuilder
NewBTreeBuilder 创建构建器
func (*BTreeBuilder) Add ¶
func (b *BTreeBuilder) Add(key int64, dataOffset int64, dataSize int32) error
Add 添加一个 key-value 对 (数据必须已排序)
func (*BTreeBuilder) Build ¶
func (b *BTreeBuilder) Build() (rootOffset int64, err error)
Build 构建完整的 B+Tree,返回根节点的 offset
type BTreeNode ¶
type BTreeNode struct {
// Header (32 bytes)
NodeType byte // 0=Internal, 1=Leaf
KeyCount uint16 // key 数量
Level byte // 层级 (0=叶子层)
Reserved [28]byte // 预留字段
// Keys (variable, 最多 256 个)
Keys []int64 // key 数组
// Values (variable)
// Internal Node: 子节点指针
Children []int64 // 子节点的文件 offset
// Leaf Node: 数据位置
DataOffsets []int64 // 数据块的文件 offset
DataSizes []int32 // 数据块大小
}
BTreeNode 表示一个 B+Tree 节点 (4 KB)
func UnmarshalBTree ¶
UnmarshalBTree 从字节数组反序列化节点
参数:
data: 4KB 节点数据(通常来自 mmap)
返回:
*BTreeNode: 反序列化后的节点
零拷贝优化:
- 直接从 mmap 数据读取,不复制整个节点
- 只复制必要的字段(Keys, Children, DataOffsets, DataSizes)
func (*BTreeNode) Marshal ¶
Marshal 序列化节点到 4 KB
布局:
[Header: 32B] [Keys: KeyCount * 8B] [Values: 取决于节点类型] - Internal: Children (KeyCount+1) * 8B - Leaf: 交错存储 (Offset, Size) 对,每对 12B,共 KeyCount * 12B
示例(叶子节点,KeyCount=3):
Offset | Size | Content -------|------|---------------------------------- 0 | 1 | NodeType = 1 (Leaf) 1 | 2 | KeyCount = 3 3 | 1 | Level = 0 4 | 28 | Reserved 32 | 24 | Keys [100, 200, 300] 56 | 8 | DataOffset0 = 1000 64 | 4 | DataSize0 = 50 68 | 8 | DataOffset1 = 2000 76 | 4 | DataSize1 = 60 80 | 8 | DataOffset2 = 3000 88 | 4 | DataSize2 = 70 92 | 4004 | Padding (unused)
type BTreeReader ¶
type BTreeReader struct {
// contains filtered or unexported fields
}
BTreeReader 用于查询 B+Tree (mmap)
查询流程:
- 从根节点开始
- 如果是内部节点: - 二分查找确定子节点 - 跳转到子节点继续查找
- 如果是叶子节点: - 二分查找 key - 返回 (dataOffset, dataSize)
性能优化:
- mmap 零拷贝:直接从内存映射读取节点
- 二分查找:O(log KeyCount) 在节点内查找
- 总复杂度:O(log n) = O(height * log Order)
示例(100万条数据,Order=200):
- 高度: log₂₀₀(1000000) ≈ 3
- 查询次数: 3 次节点读取 + 3 次二分查找
func NewBTreeReader ¶
func NewBTreeReader(mmap mmap.MMap, rootOffset int64) *BTreeReader
NewBTreeReader 创建查询器
func (*BTreeReader) ForEach ¶
func (r *BTreeReader) ForEach(callback KeyCallback)
ForEach 升序迭代所有 key(支持提前终止)
使用场景:
- 需要遍历数据但不想一次性加载所有 keys(节省内存)
- 支持条件过滤,找到目标后提前终止
- 支持外部自定义处理逻辑
示例:
// 找到第一个 > 100 的 key
reader.ForEach(func(key int64, offset int64, size int32) bool {
if key > 100 {
fmt.Printf("Found: %d\n", key)
return false // 停止迭代
}
return true // 继续
})
func (*BTreeReader) ForEachDesc ¶
func (r *BTreeReader) ForEachDesc(callback KeyCallback)
ForEachDesc 降序迭代所有 key(支持提前终止)
使用场景:
- 从最新数据开始遍历(时序数据库常见需求)
- 查找最近的 N 条记录
- 支持条件过滤和提前终止
示例:
// 获取最新的 10 条记录
count := 0
reader.ForEachDesc(func(key int64, offset int64, size int32) bool {
fmt.Printf("Key: %d\n", key)
count++
return count < 10 // 找到 10 条后停止
})
func (*BTreeReader) Get ¶
func (r *BTreeReader) Get(key int64) (dataOffset int64, dataSize int32, found bool)
Get 查询 key,返回数据位置
参数:
key: 要查询的 key
返回:
dataOffset: 数据块的文件偏移量 dataSize: 数据块的大小 found: 是否找到
查询流程:
- 从根节点开始遍历
- 内部节点:二分查找确定子节点,跳转
- 叶子节点:二分查找 key,返回数据位置
func (*BTreeReader) GetAllKeys ¶
func (r *BTreeReader) GetAllKeys() []int64
GetAllKeys 获取 B+Tree 中所有的 key(按升序)
func (*BTreeReader) GetAllKeysDesc ¶
func (r *BTreeReader) GetAllKeysDesc() []int64
GetAllKeysDesc 获取 B+Tree 中所有的 key(按降序)
性能优化:
- 从右到左遍历叶子节点
- 每个叶子节点内从后往前读取 keys
- 避免额外的排序操作
type CompactionManager ¶
type CompactionManager struct {
// contains filtered or unexported fields
}
CompactionManager 管理 Compaction 流程
func NewCompactionManager ¶
func NewCompactionManager(sstDir string, versionSet *VersionSet, sstManager *SSTableManager) *CompactionManager
NewCompactionManager 创建新的 Compaction Manager(使用默认配置)
func (*CompactionManager) ApplyConfig ¶
func (m *CompactionManager) ApplyConfig(opts *Options)
ApplyConfig 应用数据库级配置(从 Database Options)
func (*CompactionManager) CleanupOrphanFiles ¶
func (m *CompactionManager) CleanupOrphanFiles()
CleanupOrphanFiles 手动触发孤儿文件清理(可在启动时调用)
func (*CompactionManager) DoCompaction ¶
func (m *CompactionManager) DoCompaction(task *CompactionTask) error
DoCompaction 执行一次 Compaction(兼容旧接口)
func (*CompactionManager) DoCompactionWithVersion ¶
func (m *CompactionManager) DoCompactionWithVersion(task *CompactionTask, version *Version) error
DoCompactionWithVersion 使用指定的版本执行 Compaction
func (*CompactionManager) GetLevelSizeLimit ¶
func (m *CompactionManager) GetLevelSizeLimit(level int) int64
GetLevelSizeLimit 获取指定层级的大小限制(公开方法,供 WebUI 等外部使用) 注意:Options 是不可变的,配置在 ApplyConfig 后不会改变,因此不需要加锁
func (*CompactionManager) GetLevelStats ¶
func (m *CompactionManager) GetLevelStats() []LevelStats
GetLevelStats 获取每层的统计信息
func (*CompactionManager) GetPicker ¶
func (m *CompactionManager) GetPicker() *Picker
GetPicker 获取 Compaction Picker
func (*CompactionManager) GetStats ¶
func (m *CompactionManager) GetStats() *CompactionStats
GetStats 获取 Compaction 统计信息
func (*CompactionManager) MaybeCompact ¶
func (m *CompactionManager) MaybeCompact()
MaybeCompact 检查是否需要 Compaction 并执行(公开方法,供外部调用) 非阻塞:如果已有 compaction 在执行,直接返回
func (*CompactionManager) SetSchema ¶
func (m *CompactionManager) SetSchema(schema *Schema)
SetSchema 设置 Schema(用于优化 SST 文件读写)
func (*CompactionManager) TriggerCompaction ¶
func (m *CompactionManager) TriggerCompaction() error
TriggerCompaction 手动触发一次 Compaction(遍历所有阶段)
type CompactionStats ¶
type CompactionStats struct {
TotalCompactions int64 `json:"total_compactions"` // 总 compaction 次数
LastCompactionTime time.Time `json:"last_compaction_time"` // 最后一次 compaction 时间
}
CompactionStats Compaction 统计信息
type CompactionTask ¶
type CompactionTask struct {
Level int // 源层级
InputFiles []*FileMetadata // 需要合并的输入文件
OutputLevel int // 输出层级
}
CompactionTask 表示一个 Compaction 任务
type Compactor ¶
type Compactor struct {
// contains filtered or unexported fields
}
Compactor 负责执行 Compaction
func NewCompactor ¶
func NewCompactor(sstDir string, versionSet *VersionSet) *Compactor
NewCompactor 创建新的 Compactor
func (*Compactor) DoCompaction ¶
func (c *Compactor) DoCompaction(task *CompactionTask, version *Version) (*VersionEdit, error)
DoCompaction 执行一次 Compaction 返回: VersionEdit (记录变更), error
type Database ¶
type Database struct {
// contains filtered or unexported fields
}
Database 数据库,管理多个表
func OpenWithOptions ¶
OpenWithOptions 使用指定配置打开数据库
func (*Database) CleanTable ¶
CleanTable 清除指定表的数据(保留表结构)
func (*Database) CreateTable ¶
CreateTable 创建表
func (*Database) DestroyTable ¶
DestroyTable 销毁指定表并从 Database 中删除
func (*Database) GetAllTablesInfo ¶
GetAllTablesInfo 获取所有表的信息(用于 WebUI)
type ErrCode ¶
type ErrCode int
ErrCode 错误码类型
const ( // 通用错误 (1000-1999) ErrCodeNotFound ErrCode = 1000 // 数据未找到 ErrCodeClosed ErrCode = 1001 // 对象已关闭 ErrCodeInvalidData ErrCode = 1002 // 无效数据 ErrCodeCorrupted ErrCode = 1003 // 数据损坏 ErrCodeExists ErrCode = 1004 // 对象已存在 ErrCodeInvalidParam ErrCode = 1005 // 无效参数 // 数据库错误 (2000-2999) ErrCodeDatabaseNotFound ErrCode = 2000 // 数据库不存在 ErrCodeDatabaseExists ErrCode = 2001 // 数据库已存在 ErrCodeDatabaseClosed ErrCode = 2002 // 数据库已关闭 // 表错误 (3000-3999) ErrCodeTableNotFound ErrCode = 3000 // 表不存在 ErrCodeTableExists ErrCode = 3001 // 表已存在 ErrCodeTableClosed ErrCode = 3002 // 表已关闭 // Schema 错误 (4000-4999) ErrCodeSchemaNotFound ErrCode = 4000 // Schema 不存在 ErrCodeSchemaInvalid ErrCode = 4001 // Schema 无效 ErrCodeSchemaMismatch ErrCode = 4002 // Schema 不匹配 ErrCodeSchemaValidationFailed ErrCode = 4003 // Schema 验证失败 ErrCodeSchemaChecksumMismatch ErrCode = 4004 // Schema 校验和不匹配 // 字段错误 (5000-5999) ErrCodeFieldNotFound ErrCode = 5000 // 字段不存在 ErrCodeFieldTypeMismatch ErrCode = 5001 // 字段类型不匹配 ErrCodeFieldRequired ErrCode = 5002 // 必填字段缺失 // 索引错误 (6000-6999) ErrCodeIndexNotFound ErrCode = 6000 // 索引不存在 ErrCodeIndexExists ErrCode = 6001 // 索引已存在 ErrCodeIndexNotReady ErrCode = 6002 // 索引未就绪 ErrCodeIndexCorrupted ErrCode = 6003 // 索引损坏 // 文件格式错误 (7000-7999) ErrCodeInvalidFormat ErrCode = 7000 // 无效的文件格式 ErrCodeUnsupportedVersion ErrCode = 7001 // 不支持的版本 ErrCodeInvalidMagicNumber ErrCode = 7002 // 无效的魔数 ErrCodeChecksumMismatch ErrCode = 7003 // 校验和不匹配 // WAL 错误 (8000-8999) ErrCodeWALCorrupted ErrCode = 8000 // WAL 文件损坏 ErrCodeWALClosed ErrCode = 8001 // WAL 已关闭 // SSTable 错误 (9000-9999) ErrCodeSSTableNotFound ErrCode = 9000 // SSTable 文件不存在 ErrCodeSSTableCorrupted ErrCode = 9001 // SSTable 文件损坏 // Compaction 错误 (10000-10999) ErrCodeCompactionInProgress ErrCode = 10000 // Compaction 正在进行 ErrCodeNoCompactionNeeded ErrCode = 10001 // 不需要 Compaction // 编解码错误 (11000-11999) ErrCodeEncodeFailed ErrCode = 11000 // 编码失败 ErrCodeDecodeFailed ErrCode = 11001 // 解码失败 )
错误码定义
type Error ¶
Error 错误类型
type Expr ¶
func NotBetween ¶
func NotContains ¶
func NotEndsWith ¶
func NotStartsWith ¶
func StartsWith ¶
type Field ¶
type Field struct {
Name string // 字段名
Type FieldType // 字段类型
Indexed bool // 是否建立索引
Nullable bool // 是否允许 NULL 值
Comment string // 注释
}
Field 字段定义
func StructToFields ¶
StructToFields 从 Go 结构体生成 Field 列表
支持的 struct tag 格式:
- `srdb:"name"` - 指定字段名(默认使用 snake_case 转换)
- `srdb:"name;indexed"` - 指定字段名并标记为索引
- `srdb:"name;nullable"` - 指定字段名并标记为可空
- `srdb:"name;indexed;nullable;comment:用户名"` - 完整格式
- `srdb:"-"` - 忽略该字段
Tag 格式说明:
- 使用分号 `;` 分隔不同的部分
- 第一部分是字段名(可选,默认使用 snake_case 转换结构体字段名)
- `indexed` 标记该字段需要索引
- `nullable` 标记该字段允许 NULL 值
- `comment:注释内容` 指定字段注释
默认字段名转换示例:
- UserName -> user_name
- EmailAddress -> email_address
- IsActive -> is_active
类型映射(精确映射到 Go 基础类型):
- int -> FieldTypeInt
- int8 -> Int8
- int16 -> Int16
- int32 -> Int32
- int64 -> Int64
- uint -> Uint
- uint8 (byte) -> Uint8 或 Byte
- uint16 -> Uint16
- uint32 -> Uint32
- uint64 -> Uint64
- float32 -> Float32
- float64 -> Float64
- string -> String
- bool -> Bool
- rune -> Rune
- decimal.Decimal -> Decimal
示例:
type User struct {
Name string `srdb:"field:name;indexed;comment:用户名"`
Age int64 `srdb:"field:age;comment:年龄"`
Email *string `srdb:"field:email;nullable;comment:邮箱(可选)"`
}
fields, err := StructToFields(User{})
参数:
- v: 结构体实例或指针
返回:
- []Field: 字段列表
- error: 错误信息
type FieldType ¶
type FieldType int
FieldType 字段类型(对应 Go 基础类型)
const ( // 有符号整数类型 Int FieldType Int8 Int16 Int32 Int64 // 无符号整数类型 Uint Uint8 Uint16 Uint32 Uint64 // 浮点类型 Float32 Float64 // 字符串类型 String // 布尔类型 Bool // Byte 和 Rune 类型(独立类型,语义上对应 Go 的 byte 和 rune) Byte // byte 类型(底层为 uint8) Rune // rune 类型(底层为 int32) // Decimal 类型(高精度十进制,用于金融计算) Decimal // 时间类型 Time // time.Time 时间戳 Duration // time.Duration 时间间隔 // 复杂类型 Object // map[string]xxx、struct{}、*struct{} Array // 切片类型 []xxx )
type FileMetadata ¶
type FileMetadata struct {
FileNumber int64 // 文件编号
Level int // 所在层级 (0-3)
FileSize int64 // 文件大小
MinKey int64 // 最小 key
MaxKey int64 // 最大 key
RowCount int64 // 行数
}
FileMetadata SST 文件元数据
type ImmutableMemTable ¶
ImmutableMemTable 不可变的 MemTable
type IndexBTreeReader ¶
type IndexBTreeReader struct {
// contains filtered or unexported fields
}
IndexBTreeReader 使用 B+Tree 读取索引
读取流程:
- mmap 映射整个文件(零拷贝)
- 读取 Header
- 创建 BTreeReader (指向 RootOffset)
- Get(value): a. value → key (MD5 哈希) b. BTree.Get(key) → (offset, size) c. 读取 mmap[offset:offset+size](零拷贝) d. 解码并验证原始 value e. 返回 seqs
性能优化:
- mmap 零拷贝:不需要加载整个文件到内存
- B+Tree 索引:O(log n) 查询
- 按需读取:只读取需要的数据块
func NewIndexBTreeReader ¶
func NewIndexBTreeReader(file *os.File) (*IndexBTreeReader, error)
NewIndexBTreeReader 创建索引读取器
func (*IndexBTreeReader) Close ¶
func (r *IndexBTreeReader) Close() error
Close 关闭读取器 注意:不关闭 file,因为它是从外部传入的,应该由调用者关闭
func (*IndexBTreeReader) ForEach ¶
func (r *IndexBTreeReader) ForEach(callback IndexEntryCallback)
ForEach 升序迭代所有索引条目 callback 返回 false 时停止迭代,支持提前终止
func (*IndexBTreeReader) ForEachDesc ¶
func (r *IndexBTreeReader) ForEachDesc(callback IndexEntryCallback)
ForEachDesc 降序迭代所有索引条目 callback 返回 false 时停止迭代,支持提前终止
func (*IndexBTreeReader) Get ¶
func (r *IndexBTreeReader) Get(value string) ([]int64, error)
Get 查询字段值对应的 seq 列表(零拷贝)
参数:
value: 字段值(例如 "Alice")
返回:
seqs: seq 列表(例如 [1, 5, 10]) err: 查询错误
查询流程:
- value → key (MD5 哈希)
- B+Tree.Get(key) → (offset, size)
- 读取 mmap[offset:offset+size](零拷贝)
- 解码并验证原始 value(处理哈希冲突)
- 返回 seqs
哈希冲突处理:
- 如果 storedValue != value,说明发生哈希冲突
- 返回 nil(表示未找到)
- 冲突概率极低(MD5 64位空间)
func (*IndexBTreeReader) GetMetadata ¶
func (r *IndexBTreeReader) GetMetadata() IndexMetadata
GetMetadata 获取元数据
type IndexBTreeWriter ¶
type IndexBTreeWriter struct {
// contains filtered or unexported fields
}
IndexBTreeWriter 使用 B+Tree 写入索引
写入流程:
- Add(): 收集所有 (value, seqs) 到内存
- Build(): a. 计算所有 value 的 key (MD5 哈希) b. 按 key 排序(B+Tree 要求有序) c. 编码所有条目为二进制格式 d. 构建 B+Tree 索引 e. 写入 Header + B+Tree + 数据块
文件布局:
[Header] → [B+Tree] → [Data Blocks]
func NewIndexBTreeWriter ¶
func NewIndexBTreeWriter(file *os.File, metadata IndexMetadata) *IndexBTreeWriter
NewIndexBTreeWriter 创建索引写入器
func (*IndexBTreeWriter) Add ¶
func (w *IndexBTreeWriter) Add(value string, seqs []int64)
Add 添加索引条目
type IndexEntryCallback ¶
IndexEntryCallback 索引条目回调函数 参数:value 字段值,seqs 对应的 seq 列表 返回:true 继续迭代,false 停止迭代
type IndexHeader ¶
type IndexHeader struct {
Magic uint32 // 魔数 "IDXB"
FormatVersion uint32 // 文件格式版本号
IndexVersion int64 // 索引版本号(对应 IndexMetadata.Version)
RootOffset int64 // B+Tree 根节点偏移
DataStart int64 // 数据块起始位置
MinSeq int64 // 最小 seq
MaxSeq int64 // 最大 seq
RowCount int64 // 总行数
CreatedAt int64 // 创建时间
UpdatedAt int64 // 更新时间
Reserved [184]byte // 预留空间(减少 8 字节给 IndexVersion,减少 8 字节调整对齐)
}
IndexHeader 索引文件头
func UnmarshalIndexHeader ¶
func UnmarshalIndexHeader(data []byte) *IndexHeader
UnmarshalIndexHeader 反序列化 Header
type IndexManager ¶
type IndexManager struct {
// contains filtered or unexported fields
}
IndexManager 索引管理器
func NewIndexManager ¶
func NewIndexManager(dir string, schema *Schema) *IndexManager
NewIndexManager 创建索引管理器
func (*IndexManager) AddToIndexes ¶
func (m *IndexManager) AddToIndexes(data map[string]any, seq int64) error
AddToIndexes 添加到所有索引
func (*IndexManager) CreateIndex ¶
func (m *IndexManager) CreateIndex(field string) error
CreateIndex 创建索引
func (*IndexManager) DropIndex ¶
func (m *IndexManager) DropIndex(field string) error
DropIndex 删除索引
func (*IndexManager) GetIndex ¶
func (m *IndexManager) GetIndex(field string) (*SecondaryIndex, bool)
GetIndex 获取索引
func (*IndexManager) GetIndexMetadata ¶
func (m *IndexManager) GetIndexMetadata() map[string]IndexMetadata
GetIndexMetadata 获取所有索引的元数据
func (*IndexManager) VerifyAndRepair ¶
func (m *IndexManager) VerifyAndRepair(currentMaxSeq int64, getData func(int64) (map[string]any, error)) error
VerifyAndRepair 验证并修复所有索引
type IndexMetadata ¶
type IndexMetadata struct {
Version int64 // 索引版本号
MaxSeq int64 // 索引包含的最大 seq
MinSeq int64 // 索引包含的最小 seq
RowCount int64 // 索引包含的行数
CreatedAt int64 // 创建时间
UpdatedAt int64 // 更新时间
}
IndexMetadata 索引元数据
type KeyCallback ¶
KeyCallback 迭代回调函数
参数:
- key: 当前的 key(序列号)
- dataOffset: 数据块的文件偏移量
- dataSize: 数据块的大小
返回:
- true: 继续迭代
- false: 停止迭代
type LevelStats ¶
type LevelStats struct {
Level int `json:"level"` // 层级编号 (0-3)
FileCount int `json:"file_count"` // 文件数量
TotalSize int64 `json:"total_size"` // 总大小(字节)
Score float64 `json:"score"` // Compaction 得分
}
LevelStats 层级统计信息
type ManifestReader ¶
type ManifestReader struct {
// contains filtered or unexported fields
}
ManifestReader MANIFEST 读取器
func NewManifestReader ¶
func NewManifestReader(file io.Reader) *ManifestReader
NewManifestReader 创建 MANIFEST 读取器
func (*ManifestReader) ReadEdit ¶
func (r *ManifestReader) ReadEdit() (*VersionEdit, error)
ReadEdit 读取版本变更
type ManifestWriter ¶
type ManifestWriter struct {
// contains filtered or unexported fields
}
ManifestWriter MANIFEST 写入器
func NewManifestWriter ¶
func NewManifestWriter(file io.Writer) *ManifestWriter
NewManifestWriter 创建 MANIFEST 写入器
func (*ManifestWriter) WriteEdit ¶
func (w *ManifestWriter) WriteEdit(edit *VersionEdit) error
WriteEdit 写入版本变更
type MemTableIterator ¶
type MemTableIterator struct {
// contains filtered or unexported fields
}
MemTableIterator 迭代器
type MemTableManager ¶
type MemTableManager struct {
// contains filtered or unexported fields
}
MemTableManager MemTable 管理器
func NewMemTableManager ¶
func NewMemTableManager(maxSize int64) *MemTableManager
NewMemTableManager 创建 MemTable 管理器
func (*MemTableManager) Get ¶
func (m *MemTableManager) Get(key int64) ([]byte, bool)
Get 查询数据(先查 Active,再查 Immutables)
func (*MemTableManager) GetActive ¶
func (m *MemTableManager) GetActive() *MemTable
GetActive 获取 Active MemTable(用于 Flush 时读取)
func (*MemTableManager) GetActiveCount ¶
func (m *MemTableManager) GetActiveCount() int
GetActiveCount 获取 Active MemTable 条目数
func (*MemTableManager) GetActiveSize ¶
func (m *MemTableManager) GetActiveSize() int64
GetActiveSize 获取 Active MemTable 大小
func (*MemTableManager) GetImmutableCount ¶
func (m *MemTableManager) GetImmutableCount() int
GetImmutableCount 获取 Immutable MemTable 数量
func (*MemTableManager) GetImmutables ¶
func (m *MemTableManager) GetImmutables() []*ImmutableMemTable
GetImmutables 获取所有 Immutable MemTables(副本)
func (*MemTableManager) GetStats ¶
func (m *MemTableManager) GetStats() *MemTableStats
GetStats 获取统计信息
func (*MemTableManager) NewIterator ¶
func (m *MemTableManager) NewIterator() *MemTableIterator
NewIterator 创建 Active MemTable 的迭代器
func (*MemTableManager) Put ¶
func (m *MemTableManager) Put(key int64, value []byte)
Put 写入数据到 Active MemTable
func (*MemTableManager) RemoveImmutable ¶
func (m *MemTableManager) RemoveImmutable(target *ImmutableMemTable)
RemoveImmutable 移除指定的 Immutable MemTable
func (*MemTableManager) SetActiveWAL ¶
func (m *MemTableManager) SetActiveWAL(walNumber int64)
SetActiveWAL 设置 Active MemTable 对应的 WAL 编号
func (*MemTableManager) ShouldSwitch ¶
func (m *MemTableManager) ShouldSwitch() bool
ShouldSwitch 检查是否需要切换 MemTable
func (*MemTableManager) Switch ¶
func (m *MemTableManager) Switch(newWALNumber int64) (oldWALNumber int64, immutable *ImmutableMemTable)
Switch 切换 MemTable(Active → Immutable,创建新 Active) 返回:旧的 WAL 编号,新的 Active MemTable
func (*MemTableManager) TotalCount ¶
func (m *MemTableManager) TotalCount() int
TotalCount 获取总条目数(Active + Immutables)
func (*MemTableManager) TotalSize ¶
func (m *MemTableManager) TotalSize() int64
TotalSize 获取总大小(Active + Immutables)
type MemTableStats ¶
type MemTableStats struct {
ActiveSize int64
ActiveCount int
ImmutableCount int
ImmutablesSize int64
ImmutablesTotal int
TotalSize int64
TotalCount int
}
MemTableStats 统计信息
type Neginative ¶
type Neginative struct {
// contains filtered or unexported fields
}
func (Neginative) Match ¶
func (n Neginative) Match(fs Fieldset) bool
type Options ¶
type Options struct {
// ========== 基础配置 ==========
Dir string // 数据库目录(必需)
Logger *slog.Logger // 日志器(可选,nil 表示不输出日志)
// ========== MemTable 配置 ==========
MemTableSize int64 // MemTable 大小限制(字节),默认 64MB
AutoFlushTimeout time.Duration // 自动 flush 超时时间,默认 30s,0 表示禁用
// ========== Compaction 配置 ==========
// 层级大小限制
Level0SizeLimit int64 // L0 层大小限制,默认 64MB
Level1SizeLimit int64 // L1 层大小限制,默认 256MB
Level2SizeLimit int64 // L2 层大小限制,默认 512MB
Level3SizeLimit int64 // L3 层大小限制,默认 1GB
// 后台任务间隔
CompactionInterval time.Duration // Compaction 检查间隔,默认 10s
GCInterval time.Duration // 垃圾回收检查间隔,默认 5min
// ========== 高级配置(可选)==========
DisableAutoCompaction bool // 禁用自动 Compaction,默认 false
DisableGC bool // 禁用垃圾回收,默认 false
GCFileMinAge time.Duration // GC 文件最小年龄,默认 1min
}
Options 数据库配置选项
type Picker ¶
type Picker struct {
// contains filtered or unexported fields
}
Picker 负责选择需要 Compaction 的文件
func (*Picker) GetCurrentStage ¶
GetCurrentStage 获取当前阶段(用于测试和调试)
func (*Picker) GetLevelScore ¶
GetLevelScore 获取每层的 Compaction 得分 (用于优先级排序) 得分越高,越需要 Compaction
func (*Picker) PickCompaction ¶
func (p *Picker) PickCompaction(version *Version) []*CompactionTask
PickCompaction 选择需要 Compaction 的任务(按阶段返回,阶段内并发执行) 返回空切片表示当前阶段不需要 Compaction
执行策略(4 阶段串行 + 阶段内并发): 1. Stage 0 - L0 合并:小文件合并,减少 L0 文件数(可能保持在 L0) 2. Stage 1 - L0 升级:大文件或已合并文件升级到 L1 3. Stage 2 - L1 升级:L1 文件升级到 L2 4. Stage 3 - L2 升级:L2 文件升级到 L3
阶段控制: - 使用 currentStage 跟踪当前应该执行哪个阶段(0-3) - 每次调用返回当前阶段的任务,然后自动推进到下一阶段 - 调用者应该循环调用此方法以确保所有阶段都被尝试
为什么阶段内可以并发? - 同一阶段的任务处理不同的文件批次(按 seq 连续划分) - 这些批次的文件不重叠,可以安全并发
为什么阶段间要串行? - L0 执行后可能产生新文件,影响下一阶段的任务计算 - 必须基于最新的 version 重新计算下一阶段的任务
func (*Picker) ShouldCompact ¶
ShouldCompact 判断是否需要 Compaction(只检查,不推进阶段)
func (*Picker) UpdateLevelLimits ¶
UpdateLevelLimits 更新层级大小限制(由 CompactionManager 调用)
type QueryBuilder ¶
type QueryBuilder struct {
// contains filtered or unexported fields
}
func (*QueryBuilder) Between ¶
func (qb *QueryBuilder) Between(field string, start, end any) *QueryBuilder
func (*QueryBuilder) Contains ¶
func (qb *QueryBuilder) Contains(field string, pattern string) *QueryBuilder
func (*QueryBuilder) EndsWith ¶
func (qb *QueryBuilder) EndsWith(field string, pattern string) *QueryBuilder
func (*QueryBuilder) Eq ¶
func (qb *QueryBuilder) Eq(field string, value any) *QueryBuilder
func (*QueryBuilder) Gt ¶
func (qb *QueryBuilder) Gt(field string, value any) *QueryBuilder
func (*QueryBuilder) Gte ¶
func (qb *QueryBuilder) Gte(field string, value any) *QueryBuilder
func (*QueryBuilder) In ¶
func (qb *QueryBuilder) In(field string, values []any) *QueryBuilder
func (*QueryBuilder) IsNull ¶
func (qb *QueryBuilder) IsNull(field string) *QueryBuilder
func (*QueryBuilder) Limit ¶
func (qb *QueryBuilder) Limit(n int) *QueryBuilder
Limit 设置返回的最大记录数 用于分页查询,最多返回 n 条记录 n = 0 表示无限制
func (*QueryBuilder) Lt ¶
func (qb *QueryBuilder) Lt(field string, value any) *QueryBuilder
func (*QueryBuilder) Lte ¶
func (qb *QueryBuilder) Lte(field string, value any) *QueryBuilder
func (*QueryBuilder) Match ¶
func (qb *QueryBuilder) Match(data map[string]any) bool
Match 检查数据是否匹配所有条件
func (*QueryBuilder) NotBetween ¶
func (qb *QueryBuilder) NotBetween(field string, start, end any) *QueryBuilder
func (*QueryBuilder) NotContains ¶
func (qb *QueryBuilder) NotContains(field string, pattern string) *QueryBuilder
func (*QueryBuilder) NotEndsWith ¶
func (qb *QueryBuilder) NotEndsWith(field string, pattern string) *QueryBuilder
func (*QueryBuilder) NotEq ¶
func (qb *QueryBuilder) NotEq(field string, value any) *QueryBuilder
func (*QueryBuilder) NotIn ¶
func (qb *QueryBuilder) NotIn(field string, values []any) *QueryBuilder
func (*QueryBuilder) NotNull ¶
func (qb *QueryBuilder) NotNull(field string) *QueryBuilder
func (*QueryBuilder) NotStartsWith ¶
func (qb *QueryBuilder) NotStartsWith(field string, pattern string) *QueryBuilder
func (*QueryBuilder) Offset ¶
func (qb *QueryBuilder) Offset(n int) *QueryBuilder
Offset 设置跳过的记录数 用于分页查询,跳过前 n 条记录
func (*QueryBuilder) OrderBy ¶
func (qb *QueryBuilder) OrderBy(field string) *QueryBuilder
OrderBy 设置排序字段(升序) 仅支持 "_seq" 或有索引的字段,使用其他字段会返回错误
func (*QueryBuilder) OrderByDesc ¶
func (qb *QueryBuilder) OrderByDesc(field string) *QueryBuilder
OrderByDesc 设置排序字段(降序) 仅支持 "_seq" 或有索引的字段,使用其他字段会返回错误
func (*QueryBuilder) Paginate ¶
func (qb *QueryBuilder) Paginate(page, pageSize int) (rows *Rows, total int, err error)
Paginate 执行分页查询并返回结果、总记录数和错误 page: 页码,从 1 开始 pageSize: 每页记录数 返回值:
- rows: 当前页的数据
- total: 满足条件的总记录数(用于计算总页数)
- err: 错误信息
注意:此方法会执行两次查询,第一次获取总数,第二次获取分页数据
func (*QueryBuilder) Rows ¶
func (qb *QueryBuilder) Rows() (*Rows, error)
Rows 返回所有匹配的数据(游标模式 - 惰性加载)
func (*QueryBuilder) Select ¶
func (qb *QueryBuilder) Select(fields ...string) *QueryBuilder
Select 指定要选择的字段,如果不调用则返回所有字段
func (*QueryBuilder) StartsWith ¶
func (qb *QueryBuilder) StartsWith(field string, pattern string) *QueryBuilder
func (*QueryBuilder) Where ¶
func (qb *QueryBuilder) Where(exprs ...Expr) *QueryBuilder
type SSTableHeader ¶
type SSTableHeader struct {
// 基础信息 (32 bytes)
Magic uint32 // Magic Number: 0x53535433
Version uint32 // 版本号
Compression uint8 // 压缩类型(保留字段用于向后兼容)
Reserved1 [3]byte
Flags uint32 // 标志位
Reserved2 [16]byte
// 索引信息 (32 bytes)
IndexOffset int64 // B+Tree 索引起始位置
IndexSize int64 // B+Tree 索引大小
RootOffset int64 // B+Tree 根节点位置
Reserved3 [8]byte
// 数据信息 (32 bytes)
DataOffset int64 // 数据块起始位置
DataSize int64 // 数据块总大小
RowCount int64 // 行数
Reserved4 [8]byte
// 统计信息 (32 bytes)
MinKey int64 // 最小 key (_seq)
MaxKey int64 // 最大 key (_seq)
MinTime int64 // 最小时间戳
MaxTime int64 // 最大时间戳
// CRC 校验 (8 bytes)
CRC32 uint32 // Header CRC32
Reserved5 [4]byte
// 预留空间 (120 bytes)
Reserved6 [120]byte
}
SSTableHeader SST 文件头 (256 bytes)
func UnmarshalSSTableHeader ¶
func UnmarshalSSTableHeader(data []byte) *SSTableHeader
Unmarshal 反序列化 Header
type SSTableManager ¶
type SSTableManager struct {
// contains filtered or unexported fields
}
SSTableManager SST 文件管理器
func NewSSTableManager ¶
func NewSSTableManager(dir string) (*SSTableManager, error)
NewSSTableManager 创建 SST 管理器
func (*SSTableManager) AddReader ¶
func (m *SSTableManager) AddReader(reader *SSTableReader)
AddReader 添加 reader 到管理器(用于 compaction 创建的新文件)
func (*SSTableManager) CreateSST ¶
func (m *SSTableManager) CreateSST(fileNumber int64, rows []*SSTableRow) (*SSTableReader, error)
CreateSST 创建新的 SST 文件 fileNumber: 文件编号(由 VersionSet 分配)
func (*SSTableManager) CreateSSTWithLevel ¶
func (m *SSTableManager) CreateSSTWithLevel(fileNumber int64, rows []*SSTableRow, level int) (*SSTableReader, error)
CreateSSTWithLevel 创建新的 SST 文件到指定层级 fileNumber: 文件编号(由 VersionSet 分配)
func (*SSTableManager) Get ¶
func (m *SSTableManager) Get(seq int64) (*SSTableRow, error)
Get 从所有 SST 文件中查找数据
func (*SSTableManager) GetMaxSeq ¶
func (m *SSTableManager) GetMaxSeq() int64
GetMaxSeq 获取所有 SST 中的最大 seq
func (*SSTableManager) GetPartial ¶
func (m *SSTableManager) GetPartial(seq int64, fields []string) (*SSTableRow, error)
GetPartial 从所有 SST 文件中按需查找数据(只读取指定字段)
func (*SSTableManager) GetReaders ¶
func (m *SSTableManager) GetReaders() []*SSTableReader
GetReaders 获取所有 Readers(用于扫描)
func (*SSTableManager) ListFiles ¶
func (m *SSTableManager) ListFiles() []string
ListFiles 列出所有 SST 文件
func (*SSTableManager) RemoveReader ¶
func (m *SSTableManager) RemoveReader(fileNumber int64) error
RemoveReader 移除指定文件编号的 reader(用于 compaction)
func (*SSTableManager) SetSchema ¶
func (m *SSTableManager) SetSchema(schema *Schema)
SetSchema 设置 Schema(用于优化编解码)
type SSTableReader ¶
type SSTableReader struct {
// contains filtered or unexported fields
}
SSTableReader SST 文件读取器
func NewSSTableReader ¶
func NewSSTableReader(path string) (*SSTableReader, error)
NewSSTableReader 创建 SST 读取器
func (*SSTableReader) ForEach ¶
func (r *SSTableReader) ForEach(callback KeyCallback)
ForEach 升序迭代所有 key-offset-size 对 callback 返回 false 时停止迭代,支持提前终止
func (*SSTableReader) ForEachDesc ¶
func (r *SSTableReader) ForEachDesc(callback KeyCallback)
ForEachDesc 降序迭代所有 key-offset-size 对 callback 返回 false 时停止迭代,支持提前终止
func (*SSTableReader) GetAllKeys ¶
func (r *SSTableReader) GetAllKeys() []int64
GetAllKeys 获取文件中所有的 key(按顺序)
func (*SSTableReader) GetHeader ¶
func (r *SSTableReader) GetHeader() *SSTableHeader
GetHeader 获取文件头信息
func (*SSTableReader) GetPartial ¶
func (r *SSTableReader) GetPartial(key int64, fields []string) (*SSTableRow, error)
GetPartial 按需查询一行数据(只读取指定字段)
func (*SSTableReader) SetSchema ¶
func (r *SSTableReader) SetSchema(schema *Schema)
SetSchema 设置 Schema(用于优化编解码)
type SSTableRow ¶
SSTableRow 表示一行数据
type SSTableStats ¶
SSTableStats 统计信息
type SSTableWriter ¶
type SSTableWriter struct {
// contains filtered or unexported fields
}
SSTableWriter SST 文件写入器
func NewSSTableWriter ¶
func NewSSTableWriter(file *os.File, schema *Schema) *SSTableWriter
NewSSTableWriter 创建 SST 写入器
type Schema ¶
Schema 表结构定义
func NewSchema ¶
NewSchema 创建 Schema 参数:
- name: Schema 名称,不能为空
- fields: 字段列表,至少需要 1 个字段
返回:
- *Schema: Schema 实例
- error: 错误信息
func (*Schema) ComputeChecksum ¶
ComputeChecksum 计算 Schema 的 SHA256 校验和 使用确定性的字符串拼接算法,不依赖 json.Marshal 这样即使 Schema struct 添加新字段,只要核心内容(Name、Fields)不变,checksum 就不会变 重要:字段顺序不影响 checksum,会先按字段名排序 格式: "name:<name>;fields:<field1_name>:<field1_type>:<field1_indexed>:<field1_comment>,<field2>..."
func (*Schema) ExtractIndexValue ¶
ExtractIndexValue 提取索引值(支持类型转换)
func (*Schema) GetIndexedFields ¶
GetIndexedFields 获取所有需要索引的字段
type SchemaFile ¶
type SchemaFile struct {
Version int `json:"version"` // 文件格式版本
Timestamp int64 `json:"timestamp"` // 保存时间戳
Checksum string `json:"checksum"` // Schema 内容的 SHA256 校验和
Schema *Schema `json:"schema"` // Schema 内容
}
SchemaFile Schema 文件格式(带校验)
func NewSchemaFile ¶
func NewSchemaFile(schema *Schema) (*SchemaFile, error)
NewSchemaFile 创建带校验和的 Schema 文件
type SecondaryIndex ¶
type SecondaryIndex struct {
// contains filtered or unexported fields
}
SecondaryIndex 二级索引
func NewSecondaryIndex ¶
func NewSecondaryIndex(dir, field string, fieldType FieldType) (*SecondaryIndex, error)
NewSecondaryIndex 创建二级索引
func (*SecondaryIndex) Add ¶
func (idx *SecondaryIndex) Add(value any, seq int64) error
Add 添加索引条目(增量更新元数据)
func (*SecondaryIndex) ForEach ¶
func (idx *SecondaryIndex) ForEach(callback IndexEntryCallback) error
ForEach 升序迭代所有索引条目 callback 返回 false 时停止迭代,支持提前终止 注意:只能迭代已持久化的数据(B+Tree),不包括内存中未持久化的数据
func (*SecondaryIndex) ForEachDesc ¶
func (idx *SecondaryIndex) ForEachDesc(callback IndexEntryCallback) error
ForEachDesc 降序迭代所有索引条目 callback 返回 false 时停止迭代,支持提前终止 注意:只能迭代已持久化的数据(B+Tree),不包括内存中未持久化的数据
func (*SecondaryIndex) Get ¶
func (idx *SecondaryIndex) Get(value any) ([]int64, error)
Get 查询索引(优先查内存,然后查磁盘,合并结果)
func (*SecondaryIndex) GetMetadata ¶
func (idx *SecondaryIndex) GetMetadata() IndexMetadata
GetMetadata 获取元数据
func (*SecondaryIndex) IncrementalUpdate ¶
func (idx *SecondaryIndex) IncrementalUpdate(getData func(int64) (map[string]any, error), fromSeq, toSeq int64) error
IncrementalUpdate 增量更新索引
func (*SecondaryIndex) NeedsUpdate ¶
func (idx *SecondaryIndex) NeedsUpdate(currentMaxSeq int64) bool
NeedsUpdate 检查是否需要更新
type Table ¶
type Table struct {
// contains filtered or unexported fields
}
Table 表
func (*Table) GetCompactionManager ¶
func (t *Table) GetCompactionManager() *CompactionManager
GetCompactionManager 获取 Compaction Manager(用于高级操作)
func (*Table) GetIndex ¶
func (t *Table) GetIndex(field string) (*SecondaryIndex, bool)
GetIndex 获取指定字段的索引
func (*Table) GetIndexMetadata ¶
func (t *Table) GetIndexMetadata() map[string]IndexMetadata
GetIndexMetadata 获取索引元数据
func (*Table) GetMemtableManager ¶
func (t *Table) GetMemtableManager() *MemTableManager
GetMemtableManager 获取 Memtable Manager
func (*Table) GetPartial ¶
func (t *Table) GetPartial(seq int64, fields []string) (*SSTableRow, error)
GetPartial 按需查询数据(只读取指定字段)
func (*Table) GetSSTManager ¶
func (t *Table) GetSSTManager() *SSTableManager
GetSSTManager 获取 SST Manager
func (*Table) GetVersionSet ¶
func (t *Table) GetVersionSet() *VersionSet
GetVersionSet 获取 VersionSet(用于高级操作)
func (*Table) Insert ¶
Insert 插入数据(支持单条或批量) 支持的类型:
- map[string]any: 单条数据
- []map[string]any: 批量数据
- *struct{}: 单个结构体指针
- []struct{}: 结构体切片
- []*struct{}: 结构体指针切片
type TableInfo ¶
type TableInfo struct {
Name string `json:"name"`
Dir string `json:"dir"`
CreatedAt int64 `json:"created_at"`
}
TableInfo 表信息
type TableOptions ¶
type TableOptions struct {
Dir string
MemTableSize int64
Name string // 表名
Fields []Field // 字段列表(可选)
AutoFlushTimeout time.Duration // 自动 flush 超时时间,0 表示禁用
}
TableOptions 配置选项
type TableStats ¶
TableStats 统计信息
type Version ¶
type Version struct {
// 分层存储 SST 文件 (L0-L3)
Levels [NumLevels][]*FileMetadata
// 下一个文件编号
NextFileNumber int64
// 最后序列号
LastSequence int64
// 版本号
VersionNumber int64
// contains filtered or unexported fields
}
Version 数据库的一个版本快照
func (*Version) GetLevelFileCount ¶
GetLevelFileCount 获取指定层级的文件数量
func (*Version) GetNextFileNumber ¶
GetNextFileNumber 获取下一个文件编号
func (*Version) GetSSTFiles ¶
func (v *Version) GetSSTFiles() []*FileMetadata
GetSSTFiles 获取所有 SST 文件(副本,兼容旧接口)
type VersionEdit ¶
type VersionEdit struct {
// 添加的文件
AddedFiles []*FileMetadata
// 删除的文件(文件编号列表)
DeletedFiles []int64
// 下一个文件编号
NextFileNumber *int64
// 最后序列号
LastSequence *int64
}
VersionEdit 版本变更记录
func (*VersionEdit) SetLastSequence ¶
func (e *VersionEdit) SetLastSequence(seq int64)
SetLastSequence 设置最后序列号
func (*VersionEdit) SetNextFileNumber ¶
func (e *VersionEdit) SetNextFileNumber(num int64)
SetNextFileNumber 设置下一个文件编号
type VersionSet ¶
type VersionSet struct {
// contains filtered or unexported fields
}
VersionSet 版本集合管理器
func (*VersionSet) AllocateFileNumber ¶
func (vs *VersionSet) AllocateFileNumber() int64
AllocateFileNumber 分配文件编号
func (*VersionSet) GetLastSequence ¶
func (vs *VersionSet) GetLastSequence() int64
GetLastSequence 获取最后序列号
func (*VersionSet) GetNextFileNumber ¶
func (vs *VersionSet) GetNextFileNumber() int64
GetNextFileNumber 获取下一个文件编号
func (*VersionSet) LogAndApply ¶
func (vs *VersionSet) LogAndApply(edit *VersionEdit) error
LogAndApply 记录并应用版本变更
func (*VersionSet) SetLastSequence ¶
func (vs *VersionSet) SetLastSequence(seq int64)
SetLastSequence 设置最后序列号
type WALManager ¶
type WALManager struct {
// contains filtered or unexported fields
}
WALManager WAL 管理器,管理多个 WAL 文件
func (*WALManager) GetCurrentNumber ¶
func (m *WALManager) GetCurrentNumber() int64
GetCurrentNumber 获取当前 WAL 编号
func (*WALManager) ListWALFiles ¶
func (m *WALManager) ListWALFiles() ([]string, error)
ListWALFiles 列出所有 WAL 文件
func (*WALManager) RecoverAll ¶
func (m *WALManager) RecoverAll() ([]*WALEntry, error)
RecoverAll 恢复所有 WAL 文件