Documentation
¶
Overview ¶
The "urnadb" module provides the core functionality for database data persistence and storage engine within the system.
Index ¶
- Constants
- Variables
- type Compressor
- type Cryptor
- type Encryptor
- type LogStructuredFS
- func (lfs *LogStructuredFS) BatchFetchSegments(keys ...string) ([]*Segment, error)
- func (lfs *LogStructuredFS) CloseFS() error
- func (lfs *LogStructuredFS) CommitTxns(snapshots map[string]*Snapshot) error
- func (lfs *LogStructuredFS) DeleteSegment(key string) error
- func (lfs *LogStructuredFS) ExportSnapshotIndex() error
- func (lfs *LogStructuredFS) FetchSegment(key string) (uint64, *Segment, error)
- func (lfs *LogStructuredFS) GCState() uint8
- func (lfs *LogStructuredFS) GetDirectory() string
- func (lfs *LogStructuredFS) GetTotalSpaceUsed() uint64
- func (lfs *LogStructuredFS) IsActive(key string) bool
- func (store *LogStructuredFS) NewTransaction() (*Transaction, error)
- func (lfs *LogStructuredFS) PutSegment(key string, seg *Segment) error
- func (lfs *LogStructuredFS) RefreshInodeCount() uint64
- func (lfs *LogStructuredFS) RollbackTxns(keys []string, snapshots map[string]*Snapshot) error
- func (lfs *LogStructuredFS) RunCheckpoint(second uint32)
- func (lfs *LogStructuredFS) RunCompactRegion(schedule string) error
- func (*LogStructuredFS) SetCompressor(compressor Compressor)
- func (*LogStructuredFS) SetEncryptor(encryptor Encryptor, secret []byte) error
- func (lfs *LogStructuredFS) StopCheckpoint()
- func (lfs *LogStructuredFS) StopCompactRegion()
- func (lfs *LogStructuredFS) StopExpireLoop()
- type Options
- type Pipeline
- func (p *Pipeline) Decode(data []byte) ([]byte, error)
- func (p *Pipeline) DisableAll()
- func (p *Pipeline) DisableCompression()
- func (p *Pipeline) DisableEncryption()
- func (p *Pipeline) EnableCompression()
- func (p *Pipeline) EnableEncryption()
- func (p *Pipeline) Encode(data []byte) ([]byte, error)
- func (p *Pipeline) IsCompressionEnabled() bool
- func (p *Pipeline) IsEncryptionEnabled() bool
- func (p *Pipeline) SetCompressor(compressor Compressor)
- func (p *Pipeline) SetEncryptor(encryptor Encryptor, secret []byte) error
- type Region
- type Segment
- func AcquirePoolSegment[T Serializable](key string, data T, ttl int64) (*Segment, error)
- func NewSegment[T Serializable](key string, data T, ttl int64) (*Segment, error)
- func NewSegmentWithExpiry[T Serializable](data T, createdAt, expiredAt int64) (*Segment, error)
- func NewTombstoneSegment(key string) *Segment
- func (s *Segment) Clear()
- func (s *Segment) ExpiresIn() (int64, bool)
- func (s *Segment) GetExpiryMeta() (int64, int64)
- func (s *Segment) IsTombstone() bool
- func (s *Segment) KeyString() string
- func (s *Segment) Payload() ([]byte, uint32)
- func (s *Segment) ReleaseToPool()
- func (seg *Segment) Serialize() ([]byte, error)
- func (s *Segment) Size() int32
- func (s *Segment) ToJSON() ([]byte, error)
- func (s *Segment) ToLeaseLock() (*types.LeaseLock, error)
- func (s *Segment) ToRecord() (*types.Record, error)
- func (s *Segment) ToTable() (*types.Table, error)
- func (s *Segment) ToVariant() (*types.Variant, error)
- func (s *Segment) TypeString() string
- type Serializable
- type Snappy
- type Snapshot
- type Transaction
- type TxnState
Constants ¶
const ( // 使用整数位标志存储状态 EnabledEncryption = 1 << iota // 1: 0001 EnabledCompression // 2: 0010 )
const ImmortalTTL = -1
Variables ¶
var ( AESBlockCipher = new(Cryptor) SnappyCompressor = new(Snappy) )
var ErrEmptyBeginSnapshot = errors.New("unexpected empty begin snapshot")
Functions ¶
This section is empty.
Types ¶
type Compressor ¶
type Compressor interface {
Compress(data []byte) ([]byte, error)
Decompress(data []byte) ([]byte, error)
}
压缩和解密应该针对数据的 VALUE ? 部分进行压缩,这里针对的是不定长部分进行压缩和解密 | DEL 1 | KIND 1 | EAT 8 | CAT 8 | KLEN 4 | VLEN 4 | KEY ? | VALUE ? | CRC32 4 |
type LogStructuredFS ¶
type LogStructuredFS struct {
// contains filtered or unexported fields
}
LogStructuredFS represents the virtual file storage system.
func OpenFS ¶
func OpenFS(opt *Options) (*LogStructuredFS, error)
func (*LogStructuredFS) BatchFetchSegments ¶
func (lfs *LogStructuredFS) BatchFetchSegments(keys ...string) ([]*Segment, error)
func (*LogStructuredFS) CloseFS ¶
func (lfs *LogStructuredFS) CloseFS() error
Before closing, always check if GC (garbage collection) is executing. If GC is executing, do not close blindly.
func (*LogStructuredFS) CommitTxns ¶
func (lfs *LogStructuredFS) CommitTxns(snapshots map[string]*Snapshot) error
func (*LogStructuredFS) DeleteSegment ¶
func (lfs *LogStructuredFS) DeleteSegment(key string) error
func (*LogStructuredFS) ExportSnapshotIndex ¶
func (lfs *LogStructuredFS) ExportSnapshotIndex() error
ExportSnapshotIndex is the operation performed during a normal program exit. exporting the in-memory index snapshot to a file on disk. The current design has limitations for systems with low memory resources, such as those with RAM of 512 MB < 1 GB. If a 1 GB snapshot cannot be fully serialized to disk, mapping large files into memory may not be a good choice, as it consumes a significant amount of virtual memory space and may lead to swapping memory pages to disk.
func (*LogStructuredFS) FetchSegment ¶
func (lfs *LogStructuredFS) FetchSegment(key string) (uint64, *Segment, error)
func (*LogStructuredFS) GCState ¶
func (lfs *LogStructuredFS) GCState() uint8
GCState returns the current garbage collection (GC) state of the LogStructuredFS regions compressor worker.
func (*LogStructuredFS) GetDirectory ¶
func (lfs *LogStructuredFS) GetDirectory() string
func (*LogStructuredFS) GetTotalSpaceUsed ¶
func (lfs *LogStructuredFS) GetTotalSpaceUsed() uint64
GetTotalSpaceUsed 获取当前 NoSQL 文件存储系统使用的总空间
func (*LogStructuredFS) IsActive ¶
func (lfs *LogStructuredFS) IsActive(key string) bool
func (*LogStructuredFS) NewTransaction ¶
func (store *LogStructuredFS) NewTransaction() (*Transaction, error)
这里的 keys 是事务涉及到的 key 列表,事务执行过程中会对这些 key 进行读写操作, 所以需要在事务开始的时候就把这些 key 传进来,这样就可以在事务执行过程中对这些 key 进行操作了。 拿到这些 key 对应磁盘老版本数据写到 .txn 文件中,这样就保证了能在事物执行失败时执行回滚操作。
func (*LogStructuredFS) PutSegment ¶
func (lfs *LogStructuredFS) PutSegment(key string, seg *Segment) error
PutSegment inserts a Segment record into the LogStructuredFS virtual file system.
func (*LogStructuredFS) RefreshInodeCount ¶
func (lfs *LogStructuredFS) RefreshInodeCount() uint64
RefreshInodeCount iterate over each index in lfs.indexs.
func (*LogStructuredFS) RollbackTxns ¶
func (lfs *LogStructuredFS) RollbackTxns(keys []string, snapshots map[string]*Snapshot) error
func (*LogStructuredFS) RunCheckpoint ¶
func (lfs *LogStructuredFS) RunCheckpoint(second uint32)
func (*LogStructuredFS) RunCompactRegion ¶
func (lfs *LogStructuredFS) RunCompactRegion(schedule string) error
RunCompactRegion 使用 robfig/cron 调度垃圾回收
func (*LogStructuredFS) SetCompressor ¶
func (*LogStructuredFS) SetCompressor(compressor Compressor)
func (*LogStructuredFS) SetEncryptor ¶
func (*LogStructuredFS) SetEncryptor(encryptor Encryptor, secret []byte) error
func (*LogStructuredFS) StopCheckpoint ¶
func (lfs *LogStructuredFS) StopCheckpoint()
func (*LogStructuredFS) StopCompactRegion ¶
func (lfs *LogStructuredFS) StopCompactRegion()
StopCompactRegion 关闭垃圾回收
func (*LogStructuredFS) StopExpireLoop ¶
func (lfs *LogStructuredFS) StopExpireLoop()
type Pipeline ¶
type Pipeline struct {
Encryptor
Compressor
// contains filtered or unexported fields
}
func NewPipeline ¶
func NewPipeline() *Pipeline
func (*Pipeline) DisableAll ¶
func (p *Pipeline) DisableAll()
func (*Pipeline) DisableCompression ¶
func (p *Pipeline) DisableCompression()
func (*Pipeline) DisableEncryption ¶
func (p *Pipeline) DisableEncryption()
func (*Pipeline) EnableCompression ¶
func (p *Pipeline) EnableCompression()
func (*Pipeline) EnableEncryption ¶
func (p *Pipeline) EnableEncryption()
func (*Pipeline) IsCompressionEnabled ¶
func (*Pipeline) IsEncryptionEnabled ¶
func (*Pipeline) SetCompressor ¶
func (p *Pipeline) SetCompressor(compressor Compressor)
type Segment ¶
type Segment struct {
Tombstone int8
Type kind
ExpiredAt int64
CreatedAt int64
KeySize int32
ValueSize int32
Key []byte
Value []byte
}
| DEL 1 | KIND 1 | EAT 8 | CAT 8 | KLEN 4 | VLEN 4 | KEY ? | VALUE ? | CRC32 4 |
func AcquirePoolSegment ¶
func AcquirePoolSegment[T Serializable](key string, data T, ttl int64) (*Segment, error)
func NewSegment ¶
func NewSegment[T Serializable](key string, data T, ttl int64) (*Segment, error)
NewSegment 使用数据类型初始化并返回对应的 Segment
func NewSegmentWithExpiry ¶
func NewSegmentWithExpiry[T Serializable](data T, createdAt, expiredAt int64) (*Segment, error)
NewSegmentWithExpiry 使用数据类型和元信息初始化并返回对应的 Segment,适用于基于已有过期时间的 segment 的更新操作
func NewTombstoneSegment ¶
func (*Segment) ExpiresIn ¶
ExpiresIn 返回剩下的存活时间,一般在基于原有的 segment 更新时使用, 如果返回 -1,表示这个 segment 永不过期,并且返回 ok = true 表示这个 segment 没有过期。 如果返回 0,表示这个 segment 已经过期,ok = false 表示这个 segment 已经过期。 剩下的情况是返回剩下的存活时间,并且 ok = true 表示这个 segment 没有过期。
func (*Segment) GetExpiryMeta ¶
GetExpiryMeta 返回 Segment 的元信息,包括创建时间和过期时间,适用于基于已有过期时间的 segment 的更新操作
func (*Segment) IsTombstone ¶
func (*Segment) ReleaseToPool ¶
func (s *Segment) ReleaseToPool()
func (*Segment) TypeString ¶
type Serializable ¶
type Snapshot ¶
type Snapshot struct {
*Segment
// contains filtered or unexported fields
}
为什么单独设计一个 Snapshot 是因为需要做事物中的 key 对应的版本冲突检测。
func NewSnapshot ¶
type Transaction ¶
type Transaction struct {
// contains filtered or unexported fields
}
只要磁盘上有 .txn 文件了就说明有未提交的事务了,上次运行过程中有事物为能成功执行。 存储引擎在启动的时候就要去读取 .txn 文件中的数据来恢复未提交对应 key 的老数据版本。 为什么要有 .txn 文件,是因为事物执行过程中可能会有一些数据写入到磁盘上了。 但是还没有提交,万一这个时候系统崩溃了数据就丢了,所以要有 .txn 文件来记录是对应 key 的老数据版本。 等系统重启的时候再去读取 .txn 文件中的数据来恢复对应 key 的老数据版本,这样就保证了数据的安全性和一致性了。
func (*Transaction) AtomicBatch ¶
func (t *Transaction) AtomicBatch(callback func(txns *TxnState) error)
本次事物执行过程中新 keys 对应的新版本的 segment 进行持久化到 .db 文件中并且更新索引 inode 和 .db 文件映射关系。
func (*Transaction) Commit ¶
func (t *Transaction) Commit() error
这样下去启动的时候存储引擎就不会再去读取 .txn 文件了, 没有 .txn 文件了就说明没有未提交的事务了,事物执行成功了,一定要做版本控制检查器,检查每个数据的版本。 Commit 最重要一个环境就是对应 key 的新版本的 segment 进行持久化到 .db 文件中并且更新索引 inode 和 .db 文件映射关系。
func (*Transaction) Rollback ¶
func (t *Transaction) Rollback() error
这里 Rollback 是将缓冲区对应磁盘 .txn 中的数据写会到 .db 文件中, 写回操作要注意更新索引 inode 和 .db 文件映射关系,同样删除 .txn 文件, 这样下去启动的时候存储引擎就不会再去读取 .txn 文件了。
func (*Transaction) TxnID ¶
func (t *Transaction) TxnID() uint64