db

package
v0.0.0-...-325de3d Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Sep 23, 2021 License: MIT Imports: 27 Imported by: 0

Documentation

Index

Constants

View Source
const (
	// OffsetSize OffsetSize
	OffsetSize = 8
	// LogDataLenSize LogDataLenSize
	LogDataLenSize = 4
	// AppliIndexSize AppliIndexSize
	AppliIndexSize = 8
	// LogMaxSize  Maximum size of a single log data
	LogMaxSize = 1024 * 1024
)
View Source
const FileDefaultMode os.FileMode = 0755

FileDefaultMode FileDefaultMode

Variables

View Source
var (
	// ErrorSegmentReadEnd ErrorSegmentReadEnd
	ErrorSegmentReadEnd = errors.New("Segment file reading end")
	// ErrorReadFinished ErrorReadFinished
	ErrorReadFinished = errors.New("message bakcup file read finished")
)
View Source
var (
	// ErrorNotData ErrorNotData
	ErrorNotData = errors.New("no data")

	// MagicNumber MagicNumber
	MagicNumber = [2]byte{0x15, 0x16} // lm
	// EndMagicNumber EndMagicNumber
	EndMagicNumber = [1]byte{0x3}
	// LogVersion log version
	LogVersion = [1]byte{0x01}
	// SnapshotMagicNumber SnapshotMagicNumber
	SnapshotMagicNumber = [2]byte{0xb, 0xa} // ba
	// EndSnapshotMagicNumber EndSnapshotMagicNumber
	EndSnapshotMagicNumber = [1]byte{0xf}
	// BackupSlotMagicNumber BackupSlotMagicNumber
	BackupSlotMagicNumber = [2]byte{0xc, 0xd}
	// BackupMagicNumber BackupMagicNumber
	BackupMagicNumber = []byte("---backup start ---")
)
View Source
var (
	// ErrIndexCorrupt ErrIndexCorrupt
	ErrIndexCorrupt = errors.New("corrupt index file")
	// Encoding Encoding
	Encoding = binary.BigEndian
)
View Source
var (
	// ErrorTokenNotFound ErrorTokenNotFound
	ErrorTokenNotFound = errors.New("token not found")
)

Functions

func CopyFile

func CopyFile(dstName, srcName string) (written int64, err error)

CopyFile CopyFile

func GetDirList

func GetDirList(dirpath string) ([]string, error)

GetDirList GetDirList

func GetFileList

func GetFileList(dirpath string, suffix string) ([]string, error)

GetFileList GetFileList

func MarshalMessage

func MarshalMessage(m *Message) []byte

MarshalMessage MarshalMessage

func UnmarshalMessage

func UnmarshalMessage(data []byte, m *Message) error

UnmarshalMessage UnmarshalMessage

Types

type Conversation

type Conversation struct {
	UID             string // User UID (user who belongs to the most recent session)
	ChannelID       string // Conversation channel
	ChannelType     uint8
	UnreadCount     int    // Number of unread messages
	Timestamp       int64  // Last session timestamp (10 digits)
	LastMsgSeq      uint32 // Sequence number of the last message
	LastClientMsgNo string // Last message client number
	LastMsgID       int64  // Last message ID
	Version         int64  // Data version
}

Conversation Conversation

func (*Conversation) String

func (c *Conversation) String() string

type DB

type DB interface {
	Open() error
	Close() error
	// SaveMetaData  Application index of storage raft
	SaveMetaData(appliIndex uint64) error
	// GetMetaData Get Application index
	GetMetaData() (uint64, error)
	// GetUserToken 获取用户指定设备的token
	GetUserToken(uid string, deviceFlag lmproto.DeviceFlag) (token string, level lmproto.DeviceLevel, err error)
	// UpdateUserToken 更新用户指定设备的token
	UpdateUserToken(uid string, deviceFlag lmproto.DeviceFlag, deviceLevel lmproto.DeviceLevel, token string) error

	AddOrUpdateConversations(uid string, conversations []*Conversation) error
	GetConversations(uid string) ([]*Conversation, error)

	// 添加节点inflight数据
	AddNodeInFlightData(data []*NodeInFlightDataModel) error
	// 获取投递给节点的inflight数据
	GetNodeInFlightData() ([]*NodeInFlightDataModel, error)
	ClearNodeInFlightData() error

	// BackupSlots 备份slots
	BackupSlots(slots []byte, w io.Writer) error
	// RecoverSlotBackup 恢复备份
	RecoverSlotBackup(reader io.Reader) error

	IMessageDB
	IChannelDB
	IDenyAndAllowlistStore
	ISnapshot
}

DB DB

type Entry

type Entry struct {
	RelativeOffset uint32
	Position       uint32
}

Entry Entry

type FileDB

type FileDB struct {
	limlog.Log
	// contains filtered or unexported fields
}

FileDB FileDB

func NewFileDB

func NewFileDB(dataDir string, segmentMaxBytes int64, slotCount int) *FileDB

NewFileDB NewFileDB

func (*FileDB) AddAllowlist

func (f *FileDB) AddAllowlist(channelID string, channelType uint8, uids []string) error

AddAllowlist AddAllowlist

func (*FileDB) AddDenylist

func (f *FileDB) AddDenylist(channelID string, channelType uint8, uids []string) error

AddDenylist AddDenylist

func (*FileDB) AddNodeInFlightData

func (f *FileDB) AddNodeInFlightData(data []*NodeInFlightDataModel) error

AddNodeInFlightData 添加节点inflight数据

func (*FileDB) AddOrUpdateChannel

func (f *FileDB) AddOrUpdateChannel(channelID string, channelType uint8, data map[string]interface{}) error

AddOrUpdateChannel AddOrUpdateChannel

func (*FileDB) AddOrUpdateConversations

func (f *FileDB) AddOrUpdateConversations(uid string, conversations []*Conversation) error

AddOrUpdateConversations AddOrUpdateConversations

func (*FileDB) AddSubscribers

func (f *FileDB) AddSubscribers(channelID string, channelType uint8, uids []string) error

AddSubscribers AddSubscribers

func (*FileDB) AppendMessage

func (f *FileDB) AppendMessage(m *Message) (int, error)

AppendMessage AppendMessage

func (*FileDB) AppendMessageOfNotifyQueue

func (f *FileDB) AppendMessageOfNotifyQueue(m *Message) error

AppendMessageOfNotifyQueue AppendMessageOfNotifyQueue

func (*FileDB) AppendMessageOfUser

func (f *FileDB) AppendMessageOfUser(m *Message) (int, error)

AppendMessageOfUser Append message to user

func (*FileDB) BackupSlots

func (f *FileDB) BackupSlots(slots []byte, w io.Writer) error

BackupSlots BackupSlots

func (*FileDB) ClearNodeInFlightData

func (f *FileDB) ClearNodeInFlightData() error

ClearNodeInFlightData ClearNodeInFlightData

func (*FileDB) Close

func (f *FileDB) Close() error

Close Close

func (*FileDB) DeleteChannel

func (f *FileDB) DeleteChannel(channelID string, channelType uint8) error

DeleteChannel DeleteChannel

func (*FileDB) DeleteChannelAndClearMessages

func (f *FileDB) DeleteChannelAndClearMessages(channelID string, channelType uint8) error

DeleteChannelAndClearMessages DeleteChannelAndClearMessages

func (*FileDB) DeleteMessages

func (f *FileDB) DeleteMessages(channelID string, channelType uint8) error

DeleteMessages DeleteMessages

func (*FileDB) ExistChannel

func (f *FileDB) ExistChannel(channelID string, channelType uint8) (bool, error)

ExistChannel ExistChannel

func (*FileDB) GetAllowlist

func (f *FileDB) GetAllowlist(channelID string, channelType uint8) ([]string, error)

GetAllowlist GetAllowlist

func (*FileDB) GetChannel

func (f *FileDB) GetChannel(channelID string, channelType uint8) (map[string]interface{}, error)

GetChannel GetChannel

func (*FileDB) GetConversations

func (f *FileDB) GetConversations(uid string) ([]*Conversation, error)

GetConversations GetConversations

func (*FileDB) GetDenylist

func (f *FileDB) GetDenylist(channelID string, channelType uint8) ([]string, error)

GetDenylist GetDenylist

func (*FileDB) GetLastMessages

func (f *FileDB) GetLastMessages(channelID string, channelType uint8, endOffset uint32, limit uint64) ([]*Message, error)

GetLastMessages 获取最新的消息

func (*FileDB) GetMessage

func (f *FileDB) GetMessage(channelID string, channelType uint8, messageSeq uint32) (*Message, error)

GetMessage GetMessage

func (*FileDB) GetMessageOfUserCursor

func (f *FileDB) GetMessageOfUserCursor(uid string) (uint32, error)

GetMessageOfUserCursor GetMessageOfUserCursor

func (*FileDB) GetMessages

func (f *FileDB) GetMessages(channelID string, channelType uint8, offset uint32, limit uint64) ([]*Message, error)

GetMessages 获取消息

func (*FileDB) GetMessagesOfNotifyQueue

func (f *FileDB) GetMessagesOfNotifyQueue(count int) ([]*Message, error)

GetMessagesOfNotifyQueue GetMessagesOfNotifyQueue

func (*FileDB) GetMessagesOfUser

func (f *FileDB) GetMessagesOfUser(uid string, offset uint32, limit uint64) ([]*Message, error)

GetMessagesOfUser 获取用户队列内的消息

func (*FileDB) GetMetaData

func (f *FileDB) GetMetaData() (uint64, error)

GetMetaData GetMetaData

func (*FileDB) GetNextMessageSeq

func (f *FileDB) GetNextMessageSeq(channelID string, channelType uint8) (uint32, error)

GetNextMessageSeq GetNextMessageSeq

func (*FileDB) GetNodeInFlightData

func (f *FileDB) GetNodeInFlightData() ([]*NodeInFlightDataModel, error)

GetNodeInFlightData 获取投递给节点的inflight数据

func (*FileDB) GetSubscribers

func (f *FileDB) GetSubscribers(channelID string, channelType uint8) ([]string, error)

GetSubscribers GetSubscribers

func (*FileDB) GetUserNextMessageSeq

func (f *FileDB) GetUserNextMessageSeq(uid string) (uint32, error)

GetUserNextMessageSeq GetUserNextMessageSeq

func (*FileDB) GetUserToken

func (f *FileDB) GetUserToken(uid string, deviceFlag lmproto.DeviceFlag) (string, lmproto.DeviceLevel, error)

GetUserToken GetUserToken

func (*FileDB) Open

func (f *FileDB) Open() error

Open Open

func (*FileDB) PrepareSnapshot

func (f *FileDB) PrepareSnapshot() (*Snapshot, error)

PrepareSnapshot PrepareSnapshot

func (*FileDB) RecoverSlotBackup

func (f *FileDB) RecoverSlotBackup(reader io.Reader) error

RecoverSlotBackup 恢复备份

func (*FileDB) RemoveAllAllowlist

func (f *FileDB) RemoveAllAllowlist(channelID string, channelType uint8) error

RemoveAllAllowlist RemoveAllAllowlist

func (*FileDB) RemoveAllDenylist

func (f *FileDB) RemoveAllDenylist(channelID string, channelType uint8) error

RemoveAllDenylist RemoveAllDenylist

func (*FileDB) RemoveAllSubscriber

func (f *FileDB) RemoveAllSubscriber(channelID string, channelType uint8) error

RemoveAllSubscriber RemoveAllSubscriber

func (*FileDB) RemoveAllowlist

func (f *FileDB) RemoveAllowlist(channelID string, channelType uint8, uids []string) error

RemoveAllowlist RemoveAllowlist

func (*FileDB) RemoveDenylist

func (f *FileDB) RemoveDenylist(channelID string, channelType uint8, uids []string) error

RemoveDenylist RemoveDenylist

func (*FileDB) RemoveMessagesOfNotifyQueue

func (f *FileDB) RemoveMessagesOfNotifyQueue(messageIDs []int64) error

RemoveMessagesOfNotifyQueue RemoveMessagesOfNotifyQueue

func (*FileDB) RemoveSubscribers

func (f *FileDB) RemoveSubscribers(channelID string, channelType uint8, uids []string) error

RemoveSubscribers RemoveSubscribers

func (*FileDB) SaveMetaData

func (f *FileDB) SaveMetaData(appliIndex uint64) error

SaveMetaData SaveMetaData

func (*FileDB) SaveSnapshot

func (f *FileDB) SaveSnapshot(snapshot *Snapshot, w io.Writer) error

SaveSnapshot SaveSnapshot

func (*FileDB) Sync

func (f *FileDB) Sync() error

Sync Sync

func (*FileDB) UpdateMessageOfUserCursorIfNeed

func (f *FileDB) UpdateMessageOfUserCursorIfNeed(uid string, offset uint32) error

UpdateMessageOfUserCursorIfNeed UpdateMessageOfUserCursorIfNeed

func (*FileDB) UpdateUserToken

func (f *FileDB) UpdateUserToken(uid string, deviceFlag lmproto.DeviceFlag, deviceLevel lmproto.DeviceLevel, token string) error

UpdateUserToken UpdateUserToken

type IChannelDB

type IChannelDB interface {
	// GetNextMessageSeq 获取频道的下一个序号
	GetNextMessageSeq(channelID string, channelType uint8) (uint32, error)
	// AddOrUpdateChannel 添加或者更新频道
	AddOrUpdateChannel(channelID string, channelType uint8, data map[string]interface{}) error
	// GetChannel 获取频道数据
	GetChannel(channelID string, channelType uint8) (map[string]interface{}, error)
	// DeleteChannel 删除频道
	DeleteChannel(channelID string, channelType uint8) error
	DeleteChannelAndClearMessages(channelID string, channelType uint8) error
	// ExistChannel 是否存在指定的频道
	ExistChannel(channelID string, channelType uint8) (bool, error)
	// AddSubscribers 添加订阅者
	AddSubscribers(channelID string, channelType uint8, uids []string) error
	// RemoveSubscribers 移除指定频道内指定uid的订阅者
	RemoveSubscribers(channelID string, channelType uint8, uids []string) error
	// GetSubscribers 获取订阅者列表
	GetSubscribers(channelID string, channelType uint8) ([]string, error)
	RemoveAllSubscriber(channelID string, channelType uint8) error
}

IChannelDB IChannelDB

type IDenyAndAllowlistStore

type IDenyAndAllowlistStore interface {
	// AddDenylist 添加频道黑名单
	AddDenylist(channelID string, channelType uint8, uids []string) error
	// GetDenylist 获取频道黑名单列表
	GetDenylist(channelID string, channelType uint8) ([]string, error)
	// RemoveDenylist 移除频道内指定用户的黑名单
	RemoveDenylist(channelID string, channelType uint8, uids []string) error
	// RemoveAllDenylist 移除指定频道的所有黑名单
	RemoveAllDenylist(channelID string, channelType uint8) error
	// GetAllowlist 获取白名单
	GetAllowlist(channelID string, channelType uint8) ([]string, error)
	// AddAllowlist 添加白名单
	AddAllowlist(channelID string, channelType uint8, uids []string) error
	// RemoveAllowlist 移除白名单
	RemoveAllowlist(channelID string, channelType uint8, uids []string) error
	// RemoveAllAllowlist 移除指定频道的所有白名单
	RemoveAllAllowlist(channelID string, channelType uint8) error
}

IDenyAndAllowlistStore IDenyAndAllowlistStore

type ILog

type ILog interface {
	GetAppliIndex() uint64
	Offset() int64
	Encode() ([]byte, error)
	Decode(data []byte) error
}

ILog ILog

type IMessageDB

type IMessageDB interface {
	GetUserNextMessageSeq(uid string) (uint32, error)
	// AppendMessage 追加消息到频道队列  n 为追加的实际字节数
	AppendMessage(m *Message) (n int, err error)

	// AppendMessageOfUser 追加消息到用户队列
	AppendMessageOfUser(m *Message) (int, error)
	// UpdateMessageOfUserCursorIfNeed 更新用户消息队列的游标,用户读到的位置
	UpdateMessageOfUserCursorIfNeed(uid string, offset uint32) error
	// GetMessageOfUserCursor(uid string) (uint32, error)
	// AppendMessageOfNotifyQueue 追加消息到通知队列
	AppendMessageOfNotifyQueue(m *Message) error
	GetMessagesOfNotifyQueue(count int) ([]*Message, error)
	// RemoveMessagesOfNotifyQueue 从通知队列里移除消息
	RemoveMessagesOfNotifyQueue(messageIDs []int64) error
	// GetMessages 获取消息
	GetMessages(channelID string, channelType uint8, offset uint32, limit uint64) ([]*Message, error)
	GetLastMessages(channelID string, channelType uint8, endOffset uint32, limit uint64) ([]*Message, error)
	GetMessagesOfUser(uid string, offset uint32, limit uint64) ([]*Message, error)
	GetMessage(channelID string, channelType uint8, messageSeq uint32) (*Message, error)
	DeleteMessages(channelID string, channelType uint8) error
}

IMessageDB IMessageDB

type ISnapshot

type ISnapshot interface {
	// PrepareSnapshot PrepareSnapshot
	PrepareSnapshot() (*Snapshot, error)
	// SaveSnapshot SaveSnapshot
	SaveSnapshot(snapshot *Snapshot, w io.Writer) error
}

ISnapshot ISnapshot

type Index

type Index struct {
	limlog.Log
	// contains filtered or unexported fields
}

Index Index

func NewIndex

func NewIndex(path string, baseOffset int64) *Index

NewIndex NewIndex

func (*Index) Append

func (idx *Index) Append(offset int64, position int64) error

Append Append

func (*Index) Close

func (idx *Index) Close() error

Close Close

func (*Index) IsFull

func (idx *Index) IsFull() bool

IsFull Is full

func (*Index) Lookup

func (idx *Index) Lookup(targetOffset int64) (OffsetPosition, error)

Lookup Find the largest offset less than or equal to the given targetOffset and return a pair holding this offset and its corresponding physical file position

func (*Index) SanityCheck

func (idx *Index) SanityCheck() error

SanityCheck Sanity check

func (*Index) Sync

func (idx *Index) Sync() error

Sync Sync

func (*Index) TruncateEntries

func (idx *Index) TruncateEntries(number int) error

TruncateEntries TruncateEntries

type Message

type Message struct {
	Header      uint8
	AppliIndex  uint64
	Version     uint8
	Setting     uint8
	MessageID   int64  // 服务端的消息ID(全局唯一)
	MessageSeq  uint32 // 消息序列号 (用户唯一,有序递增)
	ClientMsgNo string // 客户端唯一标示
	Timestamp   int32  // 服务器消息时间戳(10位,到秒)
	FromUID     string // 发送者UID
	QueueUID    string // 放入到那个用户的队列内 如果QueueUID有值说明是写模式
	ChannelID   string // 频道ID
	ChannelType uint8  // 频道类型
	Payload     []byte // 消息内容
}

Message Message

func (*Message) Decode

func (m *Message) Decode(data []byte) error

Decode Decode

func (*Message) Encode

func (m *Message) Encode() ([]byte, error)

Encode Encode

func (*Message) GetAppliIndex

func (m *Message) GetAppliIndex() uint64

GetAppliIndex GetAppliIndex

func (*Message) Offset

func (m *Message) Offset() int64

Offset Offset

type NodeInFlightDataModel

type NodeInFlightDataModel struct {
	No     string // 请求唯一编号
	NodeID int32  // 接受消息的节点ID
	Req    *rpc.ForwardRecvPacketReq
}

NodeInFlightDataModel NodeInFlightDataModel

type OffsetPosition

type OffsetPosition struct {
	Offset   int64
	Position int64
}

OffsetPosition OffsetPosition

type Pool

type Pool struct {
	limlog.Log
	// contains filtered or unexported fields
}

Pool Pool

func NewPool

func NewPool(dataDir string, segmentMaxBytes int64) *Pool

NewPool NewPool

func (*Pool) Close

func (p *Pool) Close() error

Close Close

func (*Pool) GetSlot

func (p *Pool) GetSlot(slot uint32) *Slot

GetSlot GetSlot

func (*Pool) Sync

func (p *Pool) Sync() error

Sync Sync

type Scanner

type Scanner struct {
	limlog.Log
	// contains filtered or unexported fields
}

Scanner Scanner

func NewScanner

func NewScanner() *Scanner

NewScanner NewScanner

func (*Scanner) Scan

func (s *Scanner) Scan(segmentPath string, endAppliIndex uint64, callback func(data []byte) error) (bool, error)

Scan Scan

type Segment

type Segment struct {
	limlog.Log

	// logMMap mmapgo.MMap
	sync.Mutex
	// contains filtered or unexported fields
}

Segment Segment

func NewSegment

func NewSegment(topicDir string, baseOffset int64) *Segment

NewSegment NewSegment

func (*Segment) AppendLog

func (s *Segment) AppendLog(log ILog) (int, error)

AppendLog AppendLog

func (*Segment) Close

func (s *Segment) Close() error

Close Close

func (*Segment) ReadAt

func (s *Segment) ReadAt(offset int64, log ILog) error

ReadAt ReadAt

func (*Segment) ReadLogs

func (s *Segment) ReadLogs(offset int64, limit uint64, callback func(data []byte) error) error

ReadLogs ReadLogs

func (*Segment) SanityCheck

func (s *Segment) SanityCheck() (int64, error)

SanityCheck Sanity check

func (*Segment) Sync

func (s *Segment) Sync() error

Sync Sync

type Slot

type Slot struct {
	limlog.Log
	// contains filtered or unexported fields
}

Slot Slot

func NewSlot

func NewSlot(dataDir string, slot uint32, segmentMaxBytes int64) *Slot

NewSlot NewSlot

func (*Slot) Close

func (s *Slot) Close() error

Close Close

func (*Slot) GetTopic

func (s *Slot) GetTopic(topic string) *Topic

GetTopic GetTopic

func (*Slot) Sync

func (s *Slot) Sync() error

Sync Sync

type Snapshot

type Snapshot struct {
	AppliIndex uint64
}

Snapshot Snapshot

func NewSnapshot

func NewSnapshot(appliIndex uint64) *Snapshot

NewSnapshot NewSnapshot

type Topic

type Topic struct {
	limlog.Log

	sync.Mutex
	// contains filtered or unexported fields
}

Topic Topic

func NewTopic

func NewTopic(slotDir, topic string, segmentMaxBytes int64) *Topic

NewTopic NewTopic

func (*Topic) AppendLog

func (t *Topic) AppendLog(log ILog) (int, error)

AppendLog AppendLog

func (*Topic) Close

func (t *Topic) Close() error

Close Close

func (*Topic) Delete

func (t *Topic) Delete() error

Delete Delete

func (*Topic) GetLastSeq

func (t *Topic) GetLastSeq() (uint32, error)

func (*Topic) NextOffset

func (t *Topic) NextOffset() (uint32, error)

NextOffset NextOffset

func (*Topic) ReadLastLogs

func (t *Topic) ReadLastLogs(limit uint64, callback func(data []byte) error) error

ReadLastLogs 获取最新的数量的日志

func (*Topic) ReadLogAt

func (t *Topic) ReadLogAt(offset int64, log ILog) error

ReadLogAt ReadLogAt

func (*Topic) ReadLogs

func (t *Topic) ReadLogs(offset int64, limit uint64, callback func(data []byte) error) error

ReadLogs ReadLogs

func (*Topic) Sync

func (t *Topic) Sync() error

Sync Sync

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL