Documentation ¶
Index ¶
- Constants
- Variables
- func CheckAndRestoreDataDir(dir string) error
- func IsFileExist(filePath string) bool
- func Min(a, b int64) int64
- func MockIndexTornWrite(index *Index, count int64)
- func MockSegmentTornWrite(s *Segment, count int64)
- func RecordsToRaftLog(records []*Record, maxSize uint64) []raftpb.Entry
- func RecordsToVdlLog(records []*Record, maxSize int32) []logstream.Entry
- type FileMeta
- type FlagFile
- type Index
- type IndexEntry
- type LogFile
- type LogRange
- type LogStore
- func (s *LogStore) Close() error
- func (s *LogStore) CreateSnapshotMeta(applyIndex uint64) ([]*logstream.SegmentFile, error)
- func (s *LogStore) DeleteFiles(segmentNames []string) error
- func (s *LogStore) DeleteRaftLog(rindex uint64) error
- func (s *LogStore) Entries(lo, hi, maxSize uint64) ([]raftpb.Entry, error)
- func (s *LogStore) FetchLogStreamMessages(startVindex int64, endRindex uint64, maxBytes int32) ([]logstream.Entry, error, bool)
- func (s *LogStore) FirstIndex() (uint64, error)
- func (s *LogStore) FirstVindex() (int64, error)
- func (s *LogStore) GetEntriesInMemCache(count int) ([]raftpb.Entry, error)
- func (s *LogStore) GetFirstSegment() *Segment
- func (s *LogStore) GetLastSegment() *Segment
- func (s *LogStore) GetStartSegmentByVindex(startVindex int64) (*Segment, bool)
- func (s *LogStore) GetVindexByRindex(rindex uint64) (int64, error)
- func (s *LogStore) IsNewStore() bool
- func (s *LogStore) LastIndex() (uint64, error)
- func (s *LogStore) LastVindex() (int64, error)
- func (s *LogStore) MaxVindex(maxRindex uint64) (int64, error)
- func (s *LogStore) MinVindex() (int64, error)
- func (s *LogStore) ReadRaftLogByRindex(start, end uint64, maxSize uint64) ([]raftpb.Entry, error)
- func (s *LogStore) ReadRaftLogFromSegments(start, end uint64, maxSize uint64) ([]raftpb.Entry, error)
- func (s *LogStore) ReadRecordsByVindex(startVindex int64, endRindex uint64, maxSize int32) ([]logstream.Entry, error, bool)
- func (s *LogStore) ReadVdlLogFromSegments(startVindex int64, endRindex uint64, maxSize int32) ([]logstream.Entry, error)
- func (s *LogStore) SegmentCount() int
- func (s *LogStore) Snapshot() (raftpb.Snapshot, error)
- func (s *LogStore) StoreEntries(entries []raftpb.Entry) error
- func (s *LogStore) Term(i uint64) (uint64, error)
- func (s *LogStore) WriteRecords(records []*Record, recordSize int64) error
- type LogStoreConfig
- type MemCache
- func (m *MemCache) DeleteRecordsByRindex(rindex uint64)
- func (m *MemCache) GetFirstRindex() uint64
- func (m *MemCache) GetFirstVindex() int64
- func (m *MemCache) GetLastRindex() uint64
- func (m *MemCache) GetLastVindex() int64
- func (m *MemCache) GetRaftLogByRindex(start, end uint64, maxSize uint64) ([]raftpb.Entry, bool)
- func (m *MemCache) GetRecords(count int) []*Record
- func (m *MemCache) GetVdlLogByVindex(startVindex int64, endRindex uint64, maxSize int32) ([]logstream.Entry, bool)
- func (m *MemCache) LoadRecords(records []*Record)
- func (m *MemCache) WriteRecords(records []*Record)
- type NewIndexConfig
- type NewSegmentConfig
- type OpenIndexConfig
- type OpenMode
- type OpenSegmentConfig
- type OpenSegmentsConfig
- type ParseIndexResult
- type RangeFile
- type RangeInfo
- type Record
- type RequestType
- type Segment
- func NewSegment(cfg *NewSegmentConfig) (*Segment, error)
- func OpenSegment(cfg *OpenSegmentConfig) (*Segment, error)
- func OpenSegmentWithRead(cfg *OpenSegmentConfig) (*Segment, error)
- func OpenSegmentWithWrite(cfg *OpenSegmentConfig) (*Segment, error)
- func OpenSegments(cfg *OpenSegmentsConfig) ([]*Segment, error)
- func (s *Segment) Close() error
- func (s *Segment) DeleteRecordsByRindex(start uint64) error
- func (s *Segment) GetEndPositionByRindex(rindex uint64) (int64, int64, error)
- func (s *Segment) GetFirstRindex() uint64
- func (s *Segment) GetFirstVindex() int64
- func (s *Segment) GetLastRindex() uint64
- func (s *Segment) GetLastVindex() int64
- func (s *Segment) GetMaxBytes() int64
- func (s *Segment) GetMaxVindexByRindex(rindex uint64) (int64, error)
- func (s *Segment) GetName() string
- func (s *Segment) GetVindexByRindex(rindex uint64) (int64, error)
- func (s *Segment) ReOpenWithWrite() error
- func (s *Segment) ReadRaftLogsByRindex(start, end uint64, maxSize uint64) ([]raftpb.Entry, uint64, error)
- func (s *Segment) ReadVdlLogsByVindex(startVindex int64, endRindex uint64, maxSize int32) (*VdlResult, error)
- func (s *Segment) RebuildIndexFile() (*ParseIndexResult, error)
- func (s *Segment) Remove() error
- func (s *Segment) SetReadOnly()
- func (s *Segment) SyncIndexFile() error
- func (s *Segment) WriteRecords(records []*Record) error
- type SegmentStatus
- type VdlResult
Constants ¶
const ( IndexEntrySize = 36 PageSize = 4096 IndexCountPerPage = PageSize / IndexEntrySize //113 )
const ( MetaDataType int32 = iota RaftLogType VdlLogType )
const ( TermSize = 8 RindexSize = 8 RaftTypeSize = 4 RaftLogHeaderSize = 20 RecordHeaderSize = 44 )
The recording fields are arranged in the following order: crc|record_type|data_length|vindex|term|rindex|raft_type|data
Variables ¶
var ( ErrFileNotFound = errors.New("logstore: file not found") ErrArgsNotAvailable = errors.New("logstore: args not available") ErrOutOfRange = errors.New("logstore:starVindex is out of range") ErrFileExist = errors.New("logstore:file already exists") ErrFileNotExist = errors.New("logstore:the file or dir not exists") ErrEntryNotExist = errors.New("logstore:entry not exists") ErrCrcNotMatch = errors.New("logstore: Crc32 values do not match") ErrFieldNotMatch = errors.New("logstore: index entry field do not match record") ErrBadSegmentName = errors.New("logstore:bad segment name") ErrBadIndexName = errors.New("logstore:bad index name") ErrNoFileMeta = errors.New("logstore:segment has no metadata") ErrNotAllowDelete = errors.New("logstore:segment doesn't allow to delete") ErrNotAllowWrite = errors.New("logstore:segment doesn't allow to write") ErrRangeNotExist = errors.New("logstore:out of the range of logstore") ErrTornWrite = errors.New("logstore: file exist an incomplete write in the end") ErrSegmentClosed = errors.New("logstore: the segment is closed") ErrRangeMetadataDestory = errors.New("logstore: the range metadata file is destory") ErrNotContinuous = errors.New("logstore:segments are not continuous") )
var ( MaxSegmentSize int64 = 512 * 1024 * 1024 GetRecordDuration time.Duration )
var ( //logstore interface metric LogstoreReadTps = prometheus.NewCounter(prometheus.CounterOpts{ Namespace: "logstore", Subsystem: "read", Name: "read_tps", Help: "logstore read tps", }) LogstoreReadLatency = prometheus.NewSummary(prometheus.SummaryOpts{ Namespace: "logstore", Subsystem: "read", Name: "read_latency", Help: "logstore read latency", MaxAge: conf.DefaultMetricsConfig.LogstoreReadLatencySummaryDuration, Objectives: map[float64]float64{0.99: 0.001}, }) LogstoreWriteTps = prometheus.NewCounter(prometheus.CounterOpts{ Namespace: "logstore", Subsystem: "write", Name: "write_tps", Help: "logstore write tps", }) LogstoreWriteLatency = prometheus.NewSummary(prometheus.SummaryOpts{ Namespace: "logstore", Subsystem: "write", Name: "write_latency", Help: "logstore write latency", MaxAge: conf.DefaultMetricsConfig.LogstoreWriteLatencySummaryDuration, Objectives: map[float64]float64{0.99: 0.001}, }) LogstoreDeleteSegmentLatency = prometheus.NewSummary(prometheus.SummaryOpts{ Namespace: "logstore", Subsystem: "delete", Name: "delete_segment_latency", Help: "logstore delete segment latency(ms)", MaxAge: conf.DefaultMetricsConfig.LogstoreDSLatencySummaryDuration, Objectives: map[float64]float64{0.99: 0.001}, }) //segment metric SegmentReadTps = prometheus.NewCounter(prometheus.CounterOpts{ Namespace: "logstore", Subsystem: "segment", Name: "read_tps", Help: "segment read tps", }) SegmentCutCounter = prometheus.NewCounter(prometheus.CounterOpts{ Namespace: "logstore", Subsystem: "segment", Name: "segment_cut_Interval", Help: "The count of segment cut", }) )
var ( NormalExitFlag string = "normal_exit" UnnormalExitFlag string = "unnormal_exit" )
var ( IndexFileSuffix = ".idx" LogFileSuffix = ".log" )
var Encoding = binary.BigEndian
var ( //512MB ReserveRecordMemory uint64 = 1024 * 1024 * 512 //reserve memory for tail records. )
Functions ¶
func CheckAndRestoreDataDir ¶
issue #119 检查数据目录下的segment,index和range的一致性,并删除多余的信息或文件
func IsFileExist ¶
func MockIndexTornWrite ¶
just for fiu test,not use in production environment
func RecordsToRaftLog ¶
仅用于性能测试 only for performance test,don't use in production
Types ¶
type FlagFile ¶
type FlagFile struct {
// contains filtered or unexported fields
}
func NewFlagFile ¶
func (*FlagFile) NeedRecover ¶
判断是否需要恢复index 文件,以下情形需要恢复: 1.exit_flag_file文件不存在 2.exit_flag_file文件中的内容不是normal_exit
type Index ¶
type Index struct { Name string IndexPath string IndexFile *fileutil.LockedFile Position int64 }
type IndexEntry ¶
type LogStore ¶
type LogStore struct { Dir string // the living directory of the underlay files Meta *FileMeta // metadata recorded at the head of each WAL SegmentSizeBytes int64 //the max size of segment file ReserveSegmentCount int IsNew bool Mu sync.RWMutex Segments []*Segment //log segment files,the last segment is for write only, others for read only Mc *MemCache //memory cache for The latest piece of records which including all the records in last segment RangeMetaFile *RangeFile ExitFlagFile *FlagFile }
func (*LogStore) CreateSnapshotMeta ¶
func (s *LogStore) CreateSnapshotMeta(applyIndex uint64) ([]*logstream.SegmentFile, error)
func (*LogStore) DeleteFiles ¶
输出segment文件和对应的index 文件 只能从开始位置的segment开始顺序删除
func (*LogStore) DeleteRaftLog ¶
删除rindex及之后的raft log,包含rindex
func (*LogStore) Entries ¶
Entries returns a slice of log entries in the range [lo,hi). MaxSize limits the total size of the log entries returned, but Entries returns at least one entry if any.
func (*LogStore) FetchLogStreamMessages ¶
func (s *LogStore) FetchLogStreamMessages(startVindex int64, endRindex uint64, maxBytes int32) ([]logstream.Entry, error, bool)
kafka接口调用该函数获取vdl log startVindex:表示获取vdl log的开始位置 endRindex:表示获取vdl log的最大rindex,小于等于该值 maxBytes:该次获取的vdl log总大小不能超过maxBytes,如果第一条vdl log就超过了maxBytes,则返回第一条 bool return whether read from cache
func (*LogStore) FirstIndex ¶
FirstIndex returns the first index written. 0 for no entries.
func (*LogStore) FirstVindex ¶
FirstVIndex returns the first index written. -1 for no entries.
func (*LogStore) GetEntriesInMemCache ¶
仅用于测试使用,勿用于生产环境 only for performance test,don't use in production
func (*LogStore) GetFirstSegment ¶
func (*LogStore) GetLastSegment ¶
func (*LogStore) GetStartSegmentByVindex ¶
根据startVindex获得对应的segment结构,同时返回改segment是否是最后一个segment
func (*LogStore) GetVindexByRindex ¶
根据rindex获取vindex
func (*LogStore) IsNewStore ¶
func (*LogStore) LastVindex ¶
LastIndex returns the last index written. -1 for no entries.
func (*LogStore) ReadRaftLogByRindex ¶
获取[start,end)范围的raft entry,并且总大小不超过maxSize。 范围和大小两个限制,有一个突破,则立即返回 如果第一条raft entry大小就超过maxSize,则返回第一条raft entry
func (*LogStore) ReadRaftLogFromSegments ¶
func (s *LogStore) ReadRaftLogFromSegments(start, end uint64, maxSize uint64) ([]raftpb.Entry, error)
从文件读取,获取[start,end)范围的raft entry,并且总大小不超过maxSize。 范围和大小两个限制,有一个突破,则立即返回 如果第一条raft entry大小就超过maxSize,则返回第一条raft entry
func (*LogStore) ReadRecordsByVindex ¶
func (s *LogStore) ReadRecordsByVindex(startVindex int64, endRindex uint64, maxSize int32) ([]logstream.Entry, error, bool)
根据vindex获取对应的Entry startVindex:对应的vindex开始位置 endRindex:获取Entry的rindex,不超过endRindex,小于等于该值 maxSize:获取的Entry总大小不超过该值 范围和大小两个限制,有一个突破,则立即返回 如果第一条entry大小就超过maxSize,则返回第一条entry bool return whether read from cache
func (*LogStore) ReadVdlLogFromSegments ¶
func (s *LogStore) ReadVdlLogFromSegments(startVindex int64, endRindex uint64, maxSize int32) ([]logstream.Entry, error)
从文件读取,根据vindex获取对应的Entry startVindex:对应的vindex开始位置 endRindex:获取Entry的rindex,不超过endRindex,小于等于该值 maxSize:获取的Entry总大小不超过该值 范围和大小两个限制,有一个突破,则立即返回 如果第一条entry大小就超过maxSize,则返回第一条entry
func (*LogStore) SegmentCount ¶
func (*LogStore) StoreEntries ¶
raft lib调用StoreEntries存储entries到logstore中
type LogStoreConfig ¶
type MemCache ¶
type MemCache struct { Mu sync.RWMutex //the last segment and logstore will operate the MemCache, so need a lock VindexToRecords []*Record //the last segment file records which cache in memeory RindexToRecords []*Record CacheSize uint64 FirstRindex uint64 LastRindex uint64 FirstVindex int64 LastVindex int64 }
func NewMemCache ¶
创建MemCache,CacheSize取决为:保留内存/单条消息最大Size
func (*MemCache) DeleteRecordsByRindex ¶
将MemCache中的数据全部清空
func (*MemCache) GetFirstRindex ¶
func (*MemCache) GetFirstVindex ¶
func (*MemCache) GetLastRindex ¶
func (*MemCache) GetLastVindex ¶
func (*MemCache) GetRaftLogByRindex ¶
根据范围和消息总大小限制获取raft entry 范围:[start,end)
func (*MemCache) GetRecords ¶
仅用于测试 only for performance test,don't use in production
func (*MemCache) GetVdlLogByVindex ¶
func (m *MemCache) GetVdlLogByVindex(startVindex int64, endRindex uint64, maxSize int32) ([]logstream.Entry, bool)
获取从startVindex到endRindex之间的VDL Log,包括endRindex所在的记录 startVindex是VDL log index endRindex是raft log index
func (*MemCache) LoadRecords ¶
将records加载到MemCache中
func (*MemCache) WriteRecords ¶
type NewIndexConfig ¶
type NewSegmentConfig ¶
type OpenIndexConfig ¶
type OpenSegmentConfig ¶
type OpenSegmentsConfig ¶
type ParseIndexResult ¶
type ParseIndexResult struct { Entries []*IndexEntry FirstVindex int64 FirstRindex uint64 LastVindex int64 LastRindex uint64 }
type RangeFile ¶
type RangeFile struct {
// contains filtered or unexported fields
}
func NewRangeFile ¶
func (*RangeFile) AppendLogRange ¶
func (*RangeFile) DeleteLogRanges ¶
func (*RangeFile) GetRangeInfo ¶
type Record ¶
type Record struct { Crc uint32 // crc for the remainder field RecordType int32 // DataLen int64 //data length Vindex int64 Term uint64 Rindex uint64 RaftType raftpb.EntryType Data []byte }
Record store order Crc|RecordType|DataLen|Vindex|Term|Rindex|RaftType|Data
type Segment ¶
type Segment struct { Mu sync.RWMutex SegmentPath string //file path FirstVindex int64 FirstRindex uint64 LastVindex int64 //the last vdl log index LastRindex uint64 // the last raft log index InitFirstVindex int64 //if the segment has record, InitFirstVindex is equal FirstVindex Log *LogFile IndexFile *Index // the index for this segment Status SegmentStatus Mc *MemCache RangeMetaFile *RangeFile }
func NewSegment ¶
func NewSegment(cfg *NewSegmentConfig) (*Segment, error)
创建segment和index segment file name like this :"0000000000000000.log"
func OpenSegment ¶
func OpenSegment(cfg *OpenSegmentConfig) (*Segment, error)
open a exist segment file and its index file
func OpenSegmentWithRead ¶
func OpenSegmentWithRead(cfg *OpenSegmentConfig) (*Segment, error)
以只读方式打开segment
func OpenSegmentWithWrite ¶
func OpenSegmentWithWrite(cfg *OpenSegmentConfig) (*Segment, error)
以读写方式打开segment
func (*Segment) DeleteRecordsByRindex ¶
only delete record in the last segment, and must guarantee the rindex(start) in this segment.
func (*Segment) GetEndPositionByRindex ¶
读取rindex在segment和index中对应记录的结束位置 用于记录snapshot中最后一个segment和index文件
func (*Segment) GetFirstRindex ¶
func (*Segment) GetFirstVindex ¶
func (*Segment) GetLastRindex ¶
func (*Segment) GetLastVindex ¶
func (*Segment) GetMaxBytes ¶
func (*Segment) GetMaxVindexByRindex ¶
获取rindex对应的最大vindex
func (*Segment) GetVindexByRindex ¶
根据rindex读取对应的vindex
func (*Segment) ReadRaftLogsByRindex ¶
func (s *Segment) ReadRaftLogsByRindex(start, end uint64, maxSize uint64) ([]raftpb.Entry, uint64, error)
读取[start,end)范围的raft log,同时满足整个raft log size大小不能超过maxSize,hasRead表示是否已经读取了部分raftlog 用于判断第一条raftlog 就超过maxSize时是否添加该raftlog 返回<raft_log,剩下应该读取的大小,error>
func (*Segment) ReadVdlLogsByVindex ¶
func (s *Segment) ReadVdlLogsByVindex(startVindex int64, endRindex uint64, maxSize int32) (*VdlResult, error)
根据vindex读取vdl log
func (*Segment) RebuildIndexFile ¶
func (s *Segment) RebuildIndexFile() (*ParseIndexResult, error)
只有在启动的时候,才有可能进行重建索引操作 所以不需要加锁
func (*Segment) SetReadOnly ¶
func (s *Segment) SetReadOnly()
func (*Segment) SyncIndexFile ¶
type SegmentStatus ¶
type SegmentStatus int8
const ( SegmentReadOnly SegmentStatus = iota SegmentRDWR SegmentClosed )