lws

package module
v1.0.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Dec 4, 2022 License: Apache-2.0 Imports: 22 Imported by: 1

README

lws(log write system)日志写入系统

1. 介绍(什么是lws)

lws,日志写入系统,用于日志的写入和迭代读取

2. 为什么要自研lws

现在系统中用的wal日志系统,要么结构比较死板,要么写入性能相对不尽人意,针对这个情况并结合项目的使用场景,设计并开发了我们自己的lws高性能日志系统

3. lws的特点

  • 支持对文件写入的大小设定和日志分割
  • 支持对特定文件的写入
  • 支持对所有日志的遍历读取,以及指定特定文件读取
  • 支持对日志数据的定制化序列化和反序列化
  • 针对不同场景,支持不同的日志写入策略(同步写入:日志先写缓存在同步写到系统 同步刷盘:将日志数据进行刷盘 限额刷盘:累计写入x条日志再刷盘 定时刷盘:定时将日志数据刷到磁盘),默认情况下lws将数据写入缓存,再以每秒写入并新到磁盘
  • 日志文件的自动清理机制
  • 底层抽象性多种文件使用方式,包括不限于普通文件方式,内存映射方式(推荐/默认),socket远程发送方式...

4. lws使用方式

  1. lws可选参数如下:
type Options struct {
    Wf                         WriteFlag //写日志标识  默认是定时1000ms刷盘
    FlushQuota                 int       //刷盘限定值 1000
    SegmentSize                uint64        //文件的大小限制 默认64M 0 代表不限制
    Ft                         FileType      //文件类型(1 普通文件 2 mmap) 默认映射方式
    BufferSize                 int //缓存大小 0代表不加缓存 注:mmapfile下不可为0
    LogFileLimitForPurge       int           //日志文件数量限制 用于自动清除多余文件,注文件个数包括新创建文件
    LogEntryCountLimitForPurge int           //日志条目数量限制 用于自动清除日志文件
    FilePrefix                 string  //日志文件的前缀 
    FileExtension              string //日志文件的后缀 默认wal
}
  1. 如果需要对日志对象进行序列化和反序列操作,则需要注册Coder

    type Coder interface {
         Type() int8 //标识编码器的类型,用于编码此类型标识的对象
         Encode(interface{}) ([]byte, error)
         Decode([]byte) (interface{}, error)
     }
     RegisterCoder(Coder) error //将编码器注册到lws
     注:type:<=0代表lws系统占用类型,其中0 代表的是字节对象的类型
    
  2. 使用实例

    • 实例1
     l, err := Open("/root/go/src/lws/log", WithSegmentSize(30), WithFilePrex("test_"), WithWriteFlag(WF_SYNCFLUSH, 0), WithFileLimitForPurge(3))
     if err != nil {
         return err
     }
     data := []byte("hello world")
     err = l.WriteBytes(data)
     if err != nil {
         fmt.Println("write error:", err)
     }
     l.Flush()
    
     it := l.NewLogIterator()
     for it.HasNext() {
         data, err := it.Next().Get()
         if err != nil {
             fmt.Println("get error:", err)
         } else {
             fmt.Println("data:", string(data))
         }
     }
     it.Release()  //迭代器使用后要记得释放,否则会造成后台清理难以执行
    
     //如果要迭代步长n
     n := 10
     it2 := l.NewLogIterator()
     if it2.HasNextN(n) {
         data, err := it2.NextN(n).Get()
         if err != nil {
             fmt.Println("get error:", err)
         } else {
             fmt.Println("data:", string(data))
         }
     }
     l.Close()
    
    • 实例2
     type StudentCoder struct {
     }
    
     func (sc *StudentCoder) Type() int8 {
         return 1
     }
    
     func (sc *StudentCoder) Encode(s interface{}) ([]byte, error) {
         return json.Marshal(s)
     }
     func (sc *StudentCoder) Decode(data []byte) (interface{}, error) {
         var (
             s Student
         )
         err := json.Unmarshal(data, &s)
         if err != nil {
             return nil, err
         }
         return &s, nil
     }
    
     l, err := Open("/root/go/src/lws/log", WithSegmentSize(30), WithFilePrex("test_"))
     require.Nil(t, err)
     err = l.RegisterCoder(&StudentCoder{})
     require.Nil(t, err)
     s := Student{
         Name:  "lucy",
         Age:   10,
         Grade: 3,
         Class: 1,
     }
     for i := 0; i < 5; i++ {
         s.Age++
         err = l.Write(1, s)
         require.Nil(t, err)
     }
     l.Flush()
     it := l.NewLogIterator()
     it.SkipToLast()  //游标跳转至最新
     for i := 0; it.HasPre() && i < 5; i++ { //从后往前遍历
         obj, err := it.Previous().GetObj() //获取到解码后的对象
         if err != nil {
             t.Log("err:", err)
         } else {
             t.Log(obj)
         }
     }
     l.Close()
    

5. lws与tidwall/wal性能对比

  1. 环境 macos 12Core 16Mem

    场景 lws tidwal.wal
    同步写 140ns/op 3800ns/op
    同步刷盘 1.39ms/op 20.6ms/op
    定时刷盘 120ns/op 不支持
  2. lws与"github.com/tidwall/wal"在同步刷盘模式下对比,lws附带crc32计算,wal没有 环境 macos 12Core 16Mem

    场景 lws wal
    30M 54ms/op 160ms/op
    50M 85ms/op 260ms/op
    80M 120ms/op 300ms/op
    100M 150ms/op 550ms/op

Documentation

Index

Constants

This section is empty.

Variables

View Source
var (
	ErrCoderExist    = errors.New("this type coder has exist")
	ErrCoderNotExist = errors.New("this type coder not exist")
	ErrCodeSysType   = errors.New("the coder type is system reservation type")

	RawCoderType int8 = 0
)
View Source
var (
	ErrPurgeWorkExisted = errors.New("purge work has been performed")
	ErrPurgeNotReached  = errors.New("purge threshold not reached")

	InitID    = 1
	InitIndex = 1
)
View Source
var (
	ErrFileTypeNotSupport = errors.New("this file type is not supported")
	ErrSegmentIndex       = errors.New("index out of segment range")
)

Functions

This section is empty.

Types

type Chansema

type Chansema struct {
	// contains filtered or unexported fields
}

chan实现的信号量

func NewChansema

func NewChansema(n int) *Chansema

func (*Chansema) Acquire

func (cs *Chansema) Acquire(ctx context.Context) error

Acquire block acquiring semaphore until acquire successfully or context cancel/timeout

func (*Chansema) Release

func (cs *Chansema) Release()

Release release the semaphore has acquired

func (*Chansema) TryAcquire

func (cs *Chansema) TryAcquire() bool

TryAcquire non-block acquiring semaphore,if successfully return true, otherwise return false

type Coder

type Coder interface {
	Type() int8
	Encode(interface{}) ([]byte, error)
	Decode([]byte) (interface{}, error)
}

type EntryContainer

type EntryContainer interface {
	FirstIndex() uint64
	LastIndex() uint64
	GetLogEntry(idx uint64) (*LogEntry, error)
	GetCoder(int8) (Coder, error)
	ReaderRelease()
}

type EntryElemnet

type EntryElemnet struct {
	// contains filtered or unexported fields
}

func (*EntryElemnet) Get

func (ele *EntryElemnet) Get() ([]byte, error)

func (*EntryElemnet) GetObj

func (ele *EntryElemnet) GetObj() (interface{}, error)

func (*EntryElemnet) Index

func (ele *EntryElemnet) Index() uint64

type EntryIterator

type EntryIterator struct {
	// contains filtered or unexported fields
}

func (*EntryIterator) HasNext

func (it *EntryIterator) HasNext() bool

func (*EntryIterator) HasNextN

func (it *EntryIterator) HasNextN(n int) bool

func (*EntryIterator) HasPre

func (it *EntryIterator) HasPre() bool

func (*EntryIterator) HasPreN

func (it *EntryIterator) HasPreN(n int) bool

func (*EntryIterator) Next

func (it *EntryIterator) Next() *EntryElemnet

func (*EntryIterator) NextN

func (it *EntryIterator) NextN(n int) *EntryElemnet

func (*EntryIterator) Previous

func (it *EntryIterator) Previous() *EntryElemnet

func (*EntryIterator) PreviousN

func (it *EntryIterator) PreviousN(n int) *EntryElemnet

func (*EntryIterator) Release

func (it *EntryIterator) Release()

func (*EntryIterator) SkipToFirst

func (it *EntryIterator) SkipToFirst()

func (*EntryIterator) SkipToLast

func (it *EntryIterator) SkipToLast()

type FileLock

type FileLock struct {
	// contains filtered or unexported fields
}

FileLock Used to exclusively lock a file

func NewFileLocker

func NewFileLocker(path string) *FileLock

func (*FileLock) Lock

func (fl *FileLock) Lock() error

Lock non-block adding an exclusive lock to a file, if successfully return nil, otherwise return a error

func (*FileLock) Unlock

func (fl *FileLock) Unlock() error

Unlock release the exclusive lock

type FileType

type FileType int
const (
	FT_NORMAL FileType = iota
	FT_MMAP
)

type FlushStrategy

type FlushStrategy int

type LogEntry

type LogEntry struct {
	Len   int //crc32 + typ + data总长度
	Crc32 uint32
	Typ   int8
	Data  []byte
}

type Lws

type Lws struct {
	// contains filtered or unexported fields
}

func Open

func Open(path string, opt ...Opt) (*Lws, error)

@title: Open @description: open a new lws instance @param {string} path 日志文件存放路径 @param {...Opt} opt 打开日志写入系统的参数配置 @return {*Lws} 日志写入系统实例句柄 @return {error} 错误信息

func OpenWithDSL

func OpenWithDSL(sl *dsl.DSL, opt ...Opt) (*Lws, error)

@title: OpenWithDSL @description: open a new lws instance with struct dsl @param {*dsl.DSL} 数据存储定位结构,其中包括协议及路径 @param {...Opt} opt 打开日志写入系统的参数配置 @return {*Lws} 日志写入系统实例句柄 @return {error} 错误信息

func (*Lws) Close

func (l *Lws) Close()

func (*Lws) Flush

func (l *Lws) Flush() error

@title: Flush @description: 手动将写入的日志条目强制刷盘 @return {error} 错误信息

func (*Lws) NewLogIterator

func (l *Lws) NewLogIterator() *EntryIterator

@title: NewLogIterator @description: 对日志写入系统的当前状态生成日志条目迭代器 @return {*EntryIterator} 日志条目迭代器

func (*Lws) Purge

func (l *Lws) Purge(opt ...PurgeOpt) error

@title: Purge @description: 根据配置的清理策略对日志文件进行清理 @param {PurgeMod} mod: 0异步清理 1:同步清理 @return {error} 错误信息

func (*Lws) ReadFromFile

func (l *Lws) ReadFromFile(file string) (*EntryIterator, error)

func (*Lws) RegisterCoder

func (l *Lws) RegisterCoder(c Coder) error

func (*Lws) UnregisterCoder

func (l *Lws) UnregisterCoder(t int8) error

func (*Lws) Write

func (l *Lws) Write(typ int8, obj interface{}) error

@title: Write @description: 将obj对象写入文件 @param {int8} typ 写入的数据类型 @param {interface{}} obj 数据 @return {error} 成功返回nil,错误返回错误详情

func (*Lws) WriteBytes

func (l *Lws) WriteBytes(data []byte) (uint64, error)

@title: WriteBytes @description: 将字节流写入文件 @param {[]byte} data 数据 @return {error} 成功返回entry的索引值&nil, 失败返回0&err

func (*Lws) WriteRetIndex

func (l *Lws) WriteRetIndex(typ int8, obj interface{}) (uint64, error)

@title: WriteRetIndex @description: 将obj对象写入文件 @param {int8} typ 写入的数据类型 @param {interface{}} obj 数据 @return {error} 成功返回entry的索引值&nil, 失败返回0&err

func (*Lws) WriteToFile

func (l *Lws) WriteToFile(file string, typ int8, obj interface{}) error

@title: WriteToFile @description: 将日志写入到特定的文件中,此日志文件名避免跟wal日志文件名冲突 @param {string} file 文件名 @param {int8} typ 写入的日志类型 @param {interface{}} obj 日志数据 @return {error} 错误信息

type LwsFile

type LwsFile interface {
	WriteAt([]byte, int64) (int, error)
	ReadAt([]byte, int64) (int, error)
	Truncate(int64) error
	Size() int64
	Sync() error
	Close() error
}

type Opt

type Opt func(*Options)

func WithBufferSize

func WithBufferSize(s int) Opt

func WithEntryLimitForPurge

func WithEntryLimitForPurge(l int) Opt

func WithFileExtension

func WithFileExtension(ext string) Opt

func WithFileLimitForPurge

func WithFileLimitForPurge(l int) Opt

func WithFilePrex

func WithFilePrex(prex string) Opt

func WithMmapFileLock

func WithMmapFileLock() Opt

func WithSegmentSize

func WithSegmentSize(s int64) Opt

func WithWriteFileType

func WithWriteFileType(ft FileType) Opt

func WithWriteFlag

func WithWriteFlag(wf WriteFlag, quota int) Opt

type Options

type Options struct {
	Wf                         WriteFlag //写日志标识
	FlushQuota                 int       //刷盘限定值
	SegmentSize                int64     //文件的大小限制 默认64M 代表不限制
	Ft                         FileType  //文件类型(1 普通文件 2 mmap) 默认1
	MmapFileLock               bool      //文件映射的时候,是否锁定内存以提高write速度
	BufferSize                 int
	LogFileLimitForPurge       int //存在日志文件限制
	LogEntryCountLimitForPurge int //存在日志条目限制
	FilePrefix                 string
	FileExtension              string
}

type PurgeOpt

type PurgeOpt func(*PurgeOptions)

func PurgeWithAsync

func PurgeWithAsync() PurgeOpt

func PurgeWithKeepFiles

func PurgeWithKeepFiles(c int) PurgeOpt

func PurgeWithSoftEntries

func PurgeWithSoftEntries(c int) PurgeOpt

type PurgeOptions

type PurgeOptions struct {
	// contains filtered or unexported fields
}

type ReaderCache

type ReaderCache struct {
	// contains filtered or unexported fields
}

ReaderCache reader缓存器

func (*ReaderCache) CleanReader

func (rc *ReaderCache) CleanReader()

func (*ReaderCache) DeleteReader

func (rc *ReaderCache) DeleteReader(segmentID uint64) *refReader

func (*ReaderCache) GetAndNewReader

func (rc *ReaderCache) GetAndNewReader(segmentID uint64, new func() (*refReader, error)) (*refReader, error)

GetAndNewReader 通过段ID获取reader,如果reader不存在,则通过new函数创建并添加到缓存中

func (*ReaderCache) GetReader

func (rc *ReaderCache) GetReader(segmentID uint64) *refReader

GetReader 通过段ID获取reader,不存在则返回nil

func (*ReaderCache) PutReader

func (rc *ReaderCache) PutReader(segmentID uint64, rr *refReader)

type Segment

type Segment struct {
	ID    uint64 //文件编号
	Size  int64  //文件当前大小
	Index uint64 //文件中日志的最小索引
	Path  string //文件路径
}

type SegmentGroup

type SegmentGroup []*Segment

func (*SegmentGroup) Append

func (sg *SegmentGroup) Append(s *Segment)

Append 追加条目

func (*SegmentGroup) Assign

func (sg *SegmentGroup) Assign(i int, s *Segment)

Assign 将索引i指定的值由s重新赋值

func (*SegmentGroup) At

func (sg *SegmentGroup) At(i int) *Segment

At 获取为i的数据,请确保i在[0,len-1)范围内

func (*SegmentGroup) Cap

func (sg *SegmentGroup) Cap() int

func (*SegmentGroup) FindAt

func (sg *SegmentGroup) FindAt(idx uint64) *Segment

FindAt 通过二分查找,找到idx所在的sgement信息

func (*SegmentGroup) First

func (sg *SegmentGroup) First() *Segment

func (*SegmentGroup) ForEach

func (sg *SegmentGroup) ForEach(fn func(i int, s *Segment) bool)

ForEach 遍历SegmentGroup所有元素,并调用fn,fn返回true标识遍历终止

func (*SegmentGroup) Last

func (sg *SegmentGroup) Last() *Segment

func (*SegmentGroup) Len

func (sg *SegmentGroup) Len() int

func (*SegmentGroup) Reserved

func (sg *SegmentGroup) Reserved(n int)

Reserved 预留,会影响SegmentGroup的cap大小

func (*SegmentGroup) Resize

func (sg *SegmentGroup) Resize(n int)

Resize 预分配,会影响SegmentGroup的len大小

func (*SegmentGroup) Split

func (sg *SegmentGroup) Split(i int) (SegmentGroup, SegmentGroup)

Split 分割,将SegmentGroup分割成[0,i),[i:len)两部分,如果i大于len,则返回[0,len), nil

type SegmentProcessor

type SegmentProcessor struct {
	// contains filtered or unexported fields
}

func (*SegmentProcessor) Close

func (sp *SegmentProcessor) Close() error

type SegmentReader

type SegmentReader struct {
	*SegmentProcessor
	// contains filtered or unexported fields
}

func NewSegmentReader

func NewSegmentReader(s *Segment, ft FileType) (*SegmentReader, error)

func (*SegmentReader) FirstIndex

func (sr *SegmentReader) FirstIndex() uint64

FirstIndex 此文件段条目的起始索引

func (*SegmentReader) LastIndex

func (sr *SegmentReader) LastIndex() uint64

LastIndex 此文件段条目的结束索引

func (*SegmentReader) ReadLogByIndex

func (sr *SegmentReader) ReadLogByIndex(index uint64) (*LogEntry, error)

ReadLogByIndex 通过index获取到指定的日志条目

type SegmentWriter

type SegmentWriter struct {
	*SegmentProcessor
	// contains filtered or unexported fields
}

func NewSegmentWriter

func NewSegmentWriter(s *Segment, opt WriterOptions) (*SegmentWriter, error)

func (*SegmentWriter) Close

func (sw *SegmentWriter) Close() error

func (*SegmentWriter) EntryCount

func (sw *SegmentWriter) EntryCount() int

EntryCount 返回当前文件写入的总条目数

func (*SegmentWriter) Flush

func (sw *SegmentWriter) Flush() error

Flush 如果用户没有指定同步写文件操作,则需要将缓存数据回写到文件,再进行刷盘

func (*SegmentWriter) Replace

func (sw *SegmentWriter) Replace(s *Segment) error

Replace 根据s信息替换写入的文件,即文件切换 切换前会将老数据进行刷盘,并将文件大小调整到实际写入大小,然后打开一个新的文件,并替换老文件,如果打开出错,则保持老文件

func (*SegmentWriter) Size

func (sw *SegmentWriter) Size() int64

Size 获取文件当前的写入的大小,因为writer会预分配文件大小,所以使用write offset标识写入的大小值

func (*SegmentWriter) Write

func (sw *SegmentWriter) Write(t int8, data []byte) (int, error)

type WriteFlag

type WriteFlag int
const (
	WF_SYNCWRITE  WriteFlag = 1                  //同步写,写系统不刷盘
	WF_TIMEDFLUSH WriteFlag = (1<<iota - 1) << 1 //定时刷盘
	WF_QUOTAFLUSH                                //日志写入数量刷盘
	WF_SYNCFLUSH                                 //同步刷盘
)

type WriterOptions

type WriterOptions struct {
	SegmentSize int64
	Ft          FileType
	Wf          WriteFlag
	Fv          int
	MapLock     bool
	BufferSize  int
}

Directories

Path Synopsis

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL