vfs

package
v1.5.1 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Apr 28, 2026 License: Apache-2.0 Imports: 27 Imported by: 0

Documentation

Overview

The "urnadb" module provides the core functionality for database data persistence and storage engine within the system.

Index

Constants

View Source
const (
	// 使用整数位标志存储状态
	EnabledEncryption  = 1 << iota // 1: 0001
	EnabledCompression             // 2: 0010
)
View Source
const ImmortalTTL = -1

Variables

View Source
var (
	AESBlockCipher   = new(Cryptor)
	SnappyCompressor = new(Snappy)
)
View Source
var ErrEmptyBeginSnapshot = errors.New("unexpected empty begin snapshot")

Functions

This section is empty.

Types

type Compressor

type Compressor interface {
	Compress(data []byte) ([]byte, error)
	Decompress(data []byte) ([]byte, error)
}

压缩和解密应该针对数据的 VALUE ? 部分进行压缩,这里针对的是不定长部分进行压缩和解密 | DEL 1 | KIND 1 | EAT 8 | CAT 8 | KLEN 4 | VLEN 4 | KEY ? | VALUE ? | CRC32 4 |

type Cryptor

type Cryptor struct{}

func (*Cryptor) Decrypt

func (*Cryptor) Decrypt(secret, ciphertext []byte) ([]byte, error)

func (*Cryptor) Encrypt

func (*Cryptor) Encrypt(secret, plaintext []byte) ([]byte, error)

type Encryptor

type Encryptor interface {
	Encrypt(secret, plianttext []byte) ([]byte, error)
	Decrypt(secret, ciphertext []byte) ([]byte, error)
}

type LogStructuredFS

type LogStructuredFS struct {
	// contains filtered or unexported fields
}

LogStructuredFS represents the virtual file storage system.

func OpenFS

func OpenFS(opt *Options) (*LogStructuredFS, error)

func (*LogStructuredFS) BatchFetchSegments

func (lfs *LogStructuredFS) BatchFetchSegments(keys ...string) ([]*Segment, error)

func (*LogStructuredFS) CloseFS

func (lfs *LogStructuredFS) CloseFS() error

Before closing, always check if GC (garbage collection) is executing. If GC is executing, do not close blindly.

func (*LogStructuredFS) CommitTxns

func (lfs *LogStructuredFS) CommitTxns(snapshots map[string]*Snapshot) error

func (*LogStructuredFS) DeleteSegment

func (lfs *LogStructuredFS) DeleteSegment(key string) error

func (*LogStructuredFS) ExportSnapshotIndex

func (lfs *LogStructuredFS) ExportSnapshotIndex() error

ExportSnapshotIndex is the operation performed during a normal program exit. exporting the in-memory index snapshot to a file on disk. The current design has limitations for systems with low memory resources, such as those with RAM of 512 MB < 1 GB. If a 1 GB snapshot cannot be fully serialized to disk, mapping large files into memory may not be a good choice, as it consumes a significant amount of virtual memory space and may lead to swapping memory pages to disk.

func (*LogStructuredFS) FetchSegment

func (lfs *LogStructuredFS) FetchSegment(key string) (uint64, *Segment, error)

func (*LogStructuredFS) GCState

func (lfs *LogStructuredFS) GCState() uint8

GCState returns the current garbage collection (GC) state of the LogStructuredFS regions compressor worker.

func (*LogStructuredFS) GetDirectory

func (lfs *LogStructuredFS) GetDirectory() string

func (*LogStructuredFS) GetTotalSpaceUsed

func (lfs *LogStructuredFS) GetTotalSpaceUsed() uint64

GetTotalSpaceUsed 获取当前 NoSQL 文件存储系统使用的总空间

func (*LogStructuredFS) IsActive

func (lfs *LogStructuredFS) IsActive(key string) bool

func (*LogStructuredFS) NewTransaction

func (store *LogStructuredFS) NewTransaction() (*Transaction, error)

这里的 keys 是事务涉及到的 key 列表,事务执行过程中会对这些 key 进行读写操作, 所以需要在事务开始的时候就把这些 key 传进来,这样就可以在事务执行过程中对这些 key 进行操作了。 拿到这些 key 对应磁盘老版本数据写到 .txn 文件中,这样就保证了能在事物执行失败时执行回滚操作。

func (*LogStructuredFS) PutSegment

func (lfs *LogStructuredFS) PutSegment(key string, seg *Segment) error

PutSegment inserts a Segment record into the LogStructuredFS virtual file system.

func (*LogStructuredFS) RefreshInodeCount

func (lfs *LogStructuredFS) RefreshInodeCount() uint64

RefreshInodeCount iterate over each index in lfs.indexs.

func (*LogStructuredFS) RollbackTxns

func (lfs *LogStructuredFS) RollbackTxns(keys []string, snapshots map[string]*Snapshot) error

func (*LogStructuredFS) RunCheckpoint

func (lfs *LogStructuredFS) RunCheckpoint(second uint32)

func (*LogStructuredFS) RunCompactRegion

func (lfs *LogStructuredFS) RunCompactRegion(schedule string) error

RunCompactRegion 使用 robfig/cron 调度垃圾回收

func (*LogStructuredFS) SetCompressor

func (*LogStructuredFS) SetCompressor(compressor Compressor)

func (*LogStructuredFS) SetEncryptor

func (*LogStructuredFS) SetEncryptor(encryptor Encryptor, secret []byte) error

func (*LogStructuredFS) StopCheckpoint

func (lfs *LogStructuredFS) StopCheckpoint()

func (*LogStructuredFS) StopCompactRegion

func (lfs *LogStructuredFS) StopCompactRegion()

StopCompactRegion 关闭垃圾回收

func (*LogStructuredFS) StopExpireLoop

func (lfs *LogStructuredFS) StopExpireLoop()

type Options

type Options struct {
	Path      string
	FSPerm    os.FileMode
	Threshold uint8
}

type Pipeline

type Pipeline struct {
	Encryptor
	Compressor
	// contains filtered or unexported fields
}

func NewPipeline

func NewPipeline() *Pipeline

func (*Pipeline) Decode

func (p *Pipeline) Decode(data []byte) ([]byte, error)

fd 必须实现 io.ReadWriteCloser 接口

func (*Pipeline) DisableAll

func (p *Pipeline) DisableAll()

func (*Pipeline) DisableCompression

func (p *Pipeline) DisableCompression()

func (*Pipeline) DisableEncryption

func (p *Pipeline) DisableEncryption()

func (*Pipeline) EnableCompression

func (p *Pipeline) EnableCompression()

func (*Pipeline) EnableEncryption

func (p *Pipeline) EnableEncryption()

func (*Pipeline) Encode

func (p *Pipeline) Encode(data []byte) ([]byte, error)

func (*Pipeline) IsCompressionEnabled

func (p *Pipeline) IsCompressionEnabled() bool

func (*Pipeline) IsEncryptionEnabled

func (p *Pipeline) IsEncryptionEnabled() bool

func (*Pipeline) SetCompressor

func (p *Pipeline) SetCompressor(compressor Compressor)

func (*Pipeline) SetEncryptor

func (p *Pipeline) SetEncryptor(encryptor Encryptor, secret []byte) error

type Region

type Region struct {
	Fd *os.File
	*mmap.ReaderAt
}

type Segment

type Segment struct {
	Tombstone int8
	Type      kind
	ExpiredAt int64
	CreatedAt int64
	KeySize   int32
	ValueSize int32
	Key       []byte
	Value     []byte
}

| DEL 1 | KIND 1 | EAT 8 | CAT 8 | KLEN 4 | VLEN 4 | KEY ? | VALUE ? | CRC32 4 |

func AcquirePoolSegment

func AcquirePoolSegment[T Serializable](key string, data T, ttl int64) (*Segment, error)

func NewSegment

func NewSegment[T Serializable](key string, data T, ttl int64) (*Segment, error)

NewSegment 使用数据类型初始化并返回对应的 Segment

func NewSegmentWithExpiry

func NewSegmentWithExpiry[T Serializable](data T, createdAt, expiredAt int64) (*Segment, error)

NewSegmentWithExpiry 使用数据类型和元信息初始化并返回对应的 Segment,适用于基于已有过期时间的 segment 的更新操作

func NewTombstoneSegment

func NewTombstoneSegment(key string) *Segment

func (*Segment) Clear

func (s *Segment) Clear()

func (*Segment) ExpiresIn

func (s *Segment) ExpiresIn() (int64, bool)

ExpiresIn 返回剩下的存活时间,一般在基于原有的 segment 更新时使用, 如果返回 -1,表示这个 segment 永不过期,并且返回 ok = true 表示这个 segment 没有过期。 如果返回 0,表示这个 segment 已经过期,ok = false 表示这个 segment 已经过期。 剩下的情况是返回剩下的存活时间,并且 ok = true 表示这个 segment 没有过期。

func (*Segment) GetExpiryMeta

func (s *Segment) GetExpiryMeta() (int64, int64)

GetExpiryMeta 返回 Segment 的元信息,包括创建时间和过期时间,适用于基于已有过期时间的 segment 的更新操作

func (*Segment) IsTombstone

func (s *Segment) IsTombstone() bool

func (*Segment) KeyString

func (s *Segment) KeyString() string

func (*Segment) Payload

func (s *Segment) Payload() ([]byte, uint32)

Payload 返回 Segment 的值和长度 注意:这里的长度是 Value 的实际字节长度,不包括 padding 和其他字段

func (*Segment) ReleaseToPool

func (s *Segment) ReleaseToPool()

func (*Segment) Serialize

func (seg *Segment) Serialize() ([]byte, error)

func (*Segment) Size

func (s *Segment) Size() int32

func (*Segment) ToJSON

func (s *Segment) ToJSON() ([]byte, error)

func (*Segment) ToLeaseLock

func (s *Segment) ToLeaseLock() (*types.LeaseLock, error)

func (*Segment) ToRecord

func (s *Segment) ToRecord() (*types.Record, error)

func (*Segment) ToTable

func (s *Segment) ToTable() (*types.Table, error)

func (*Segment) ToVariant

func (s *Segment) ToVariant() (*types.Variant, error)

func (*Segment) TypeString

func (s *Segment) TypeString() string

type Serializable

type Serializable interface {
	ToBytes() ([]byte, error)
}

type Snappy

type Snappy struct{}

func (*Snappy) Compress

func (*Snappy) Compress(data []byte) ([]byte, error)

func (*Snappy) Decompress

func (*Snappy) Decompress(data []byte) ([]byte, error)

type Snapshot

type Snapshot struct {
	*Segment
	// contains filtered or unexported fields
}

为什么单独设计一个 Snapshot 是因为需要做事物中的 key 对应的版本冲突检测。

func NewSnapshot

func NewSnapshot(seg *Segment, mvcc uint64) *Snapshot

func (*Snapshot) Version

func (s *Snapshot) Version() uint64

type Transaction

type Transaction struct {
	// contains filtered or unexported fields
}

只要磁盘上有 .txn 文件了就说明有未提交的事务了,上次运行过程中有事物为能成功执行。 存储引擎在启动的时候就要去读取 .txn 文件中的数据来恢复未提交对应 key 的老数据版本。 为什么要有 .txn 文件,是因为事物执行过程中可能会有一些数据写入到磁盘上了。 但是还没有提交,万一这个时候系统崩溃了数据就丢了,所以要有 .txn 文件来记录是对应 key 的老数据版本。 等系统重启的时候再去读取 .txn 文件中的数据来恢复对应 key 的老数据版本,这样就保证了数据的安全性和一致性了。

func (*Transaction) AtomicBatch

func (t *Transaction) AtomicBatch(callback func(txns *TxnState) error)

本次事物执行过程中新 keys 对应的新版本的 segment 进行持久化到 .db 文件中并且更新索引 inode 和 .db 文件映射关系。

func (*Transaction) Commit

func (t *Transaction) Commit() error

这样下去启动的时候存储引擎就不会再去读取 .txn 文件了, 没有 .txn 文件了就说明没有未提交的事务了,事物执行成功了,一定要做版本控制检查器,检查每个数据的版本。 Commit 最重要一个环境就是对应 key 的新版本的 segment 进行持久化到 .db 文件中并且更新索引 inode 和 .db 文件映射关系。

func (*Transaction) Rollback

func (t *Transaction) Rollback() error

这里 Rollback 是将缓冲区对应磁盘 .txn 中的数据写会到 .db 文件中, 写回操作要注意更新索引 inode 和 .db 文件映射关系,同样删除 .txn 文件, 这样下去启动的时候存储引擎就不会再去读取 .txn 文件了。

func (*Transaction) TxnID

func (t *Transaction) TxnID() uint64

type TxnState

type TxnState struct {
	// contains filtered or unexported fields
}

func (*TxnState) Begin

func (txns *TxnState) Begin(keys []string) (map[string]*Snapshot, error)

事物开始的时候对本次事物需要的 keys 进行批量获取操作,方便后面进行运算。

func (*TxnState) Save

func (txns *TxnState) Save(snaps map[string]*Snapshot) error

运算完成之后的结果进行持久化存储,注意这里的 segs 可能有新加入的新 key 不在 Begin 中返回的。

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL