Documentation ¶
Index ¶
- Constants
- Variables
- func CopyFile(dstName, srcName string) (written int64, err error)
- func GetDirList(dirpath string) ([]string, error)
- func GetFileList(dirpath string, suffix string) ([]string, error)
- func MarshalMessage(m *Message) []byte
- func UnmarshalMessage(data []byte, m *Message) error
- type Conversation
- type DB
- type Entry
- type FileDB
- func (f *FileDB) AddAllowlist(channelID string, channelType uint8, uids []string) error
- func (f *FileDB) AddDenylist(channelID string, channelType uint8, uids []string) error
- func (f *FileDB) AddNodeInFlightData(data []*NodeInFlightDataModel) error
- func (f *FileDB) AddOrUpdateChannel(channelID string, channelType uint8, data map[string]interface{}) error
- func (f *FileDB) AddOrUpdateConversations(uid string, conversations []*Conversation) error
- func (f *FileDB) AddSubscribers(channelID string, channelType uint8, uids []string) error
- func (f *FileDB) AppendMessage(m *Message) (int, error)
- func (f *FileDB) AppendMessageOfNotifyQueue(m *Message) error
- func (f *FileDB) AppendMessageOfUser(m *Message) (int, error)
- func (f *FileDB) BackupSlots(slots []byte, w io.Writer) error
- func (f *FileDB) ClearNodeInFlightData() error
- func (f *FileDB) Close() error
- func (f *FileDB) DeleteChannel(channelID string, channelType uint8) error
- func (f *FileDB) DeleteChannelAndClearMessages(channelID string, channelType uint8) error
- func (f *FileDB) DeleteMessages(channelID string, channelType uint8) error
- func (f *FileDB) ExistChannel(channelID string, channelType uint8) (bool, error)
- func (f *FileDB) GetAllowlist(channelID string, channelType uint8) ([]string, error)
- func (f *FileDB) GetChannel(channelID string, channelType uint8) (map[string]interface{}, error)
- func (f *FileDB) GetConversations(uid string) ([]*Conversation, error)
- func (f *FileDB) GetDenylist(channelID string, channelType uint8) ([]string, error)
- func (f *FileDB) GetLastMessages(channelID string, channelType uint8, endOffset uint32, limit uint64) ([]*Message, error)
- func (f *FileDB) GetMessage(channelID string, channelType uint8, messageSeq uint32) (*Message, error)
- func (f *FileDB) GetMessageOfUserCursor(uid string) (uint32, error)
- func (f *FileDB) GetMessages(channelID string, channelType uint8, offset uint32, limit uint64) ([]*Message, error)
- func (f *FileDB) GetMessagesOfNotifyQueue(count int) ([]*Message, error)
- func (f *FileDB) GetMessagesOfUser(uid string, offset uint32, limit uint64) ([]*Message, error)
- func (f *FileDB) GetMetaData() (uint64, error)
- func (f *FileDB) GetNextMessageSeq(channelID string, channelType uint8) (uint32, error)
- func (f *FileDB) GetNodeInFlightData() ([]*NodeInFlightDataModel, error)
- func (f *FileDB) GetSubscribers(channelID string, channelType uint8) ([]string, error)
- func (f *FileDB) GetUserNextMessageSeq(uid string) (uint32, error)
- func (f *FileDB) GetUserToken(uid string, deviceFlag lmproto.DeviceFlag) (string, lmproto.DeviceLevel, error)
- func (f *FileDB) Open() error
- func (f *FileDB) PrepareSnapshot() (*Snapshot, error)
- func (f *FileDB) RecoverSlotBackup(reader io.Reader) error
- func (f *FileDB) RemoveAllAllowlist(channelID string, channelType uint8) error
- func (f *FileDB) RemoveAllDenylist(channelID string, channelType uint8) error
- func (f *FileDB) RemoveAllSubscriber(channelID string, channelType uint8) error
- func (f *FileDB) RemoveAllowlist(channelID string, channelType uint8, uids []string) error
- func (f *FileDB) RemoveDenylist(channelID string, channelType uint8, uids []string) error
- func (f *FileDB) RemoveMessagesOfNotifyQueue(messageIDs []int64) error
- func (f *FileDB) RemoveSubscribers(channelID string, channelType uint8, uids []string) error
- func (f *FileDB) SaveMetaData(appliIndex uint64) error
- func (f *FileDB) SaveSnapshot(snapshot *Snapshot, w io.Writer) error
- func (f *FileDB) Sync() error
- func (f *FileDB) UpdateMessageOfUserCursorIfNeed(uid string, offset uint32) error
- func (f *FileDB) UpdateUserToken(uid string, deviceFlag lmproto.DeviceFlag, deviceLevel lmproto.DeviceLevel, ...) error
- type IChannelDB
- type IDenyAndAllowlistStore
- type ILog
- type IMessageDB
- type ISnapshot
- type Index
- func (idx *Index) Append(offset int64, position int64) error
- func (idx *Index) Close() error
- func (idx *Index) IsFull() bool
- func (idx *Index) Lookup(targetOffset int64) (OffsetPosition, error)
- func (idx *Index) SanityCheck() error
- func (idx *Index) Sync() error
- func (idx *Index) TruncateEntries(number int) error
- type Message
- type NodeInFlightDataModel
- type OffsetPosition
- type Pool
- type Scanner
- type Segment
- func (s *Segment) AppendLog(log ILog) (int, error)
- func (s *Segment) Close() error
- func (s *Segment) ReadAt(offset int64, log ILog) error
- func (s *Segment) ReadLogs(offset int64, limit uint64, callback func(data []byte) error) error
- func (s *Segment) SanityCheck() (int64, error)
- func (s *Segment) Sync() error
- type Slot
- type Snapshot
- type Topic
- func (t *Topic) AppendLog(log ILog) (int, error)
- func (t *Topic) Close() error
- func (t *Topic) Delete() error
- func (t *Topic) GetLastSeq() (uint32, error)
- func (t *Topic) NextOffset() (uint32, error)
- func (t *Topic) ReadLastLogs(limit uint64, callback func(data []byte) error) error
- func (t *Topic) ReadLogAt(offset int64, log ILog) error
- func (t *Topic) ReadLogs(offset int64, limit uint64, callback func(data []byte) error) error
- func (t *Topic) Sync() error
Constants ¶
const ( // OffsetSize OffsetSize OffsetSize = 8 // LogDataLenSize LogDataLenSize LogDataLenSize = 4 // AppliIndexSize AppliIndexSize AppliIndexSize = 8 // LogMaxSize Maximum size of a single log data LogMaxSize = 1024 * 1024 )
const FileDefaultMode os.FileMode = 0755
FileDefaultMode FileDefaultMode
Variables ¶
var ( // ErrorSegmentReadEnd ErrorSegmentReadEnd ErrorSegmentReadEnd = errors.New("Segment file reading end") // ErrorReadFinished ErrorReadFinished ErrorReadFinished = errors.New("message bakcup file read finished") )
var ( // ErrorNotData ErrorNotData ErrorNotData = errors.New("no data") // MagicNumber MagicNumber MagicNumber = [2]byte{0x15, 0x16} // lm // EndMagicNumber EndMagicNumber EndMagicNumber = [1]byte{0x3} // LogVersion log version LogVersion = [1]byte{0x01} // SnapshotMagicNumber SnapshotMagicNumber SnapshotMagicNumber = [2]byte{0xb, 0xa} // ba // EndSnapshotMagicNumber EndSnapshotMagicNumber EndSnapshotMagicNumber = [1]byte{0xf} // BackupSlotMagicNumber BackupSlotMagicNumber BackupSlotMagicNumber = [2]byte{0xc, 0xd} // BackupMagicNumber BackupMagicNumber BackupMagicNumber = []byte("---backup start ---") )
var ( // ErrIndexCorrupt ErrIndexCorrupt ErrIndexCorrupt = errors.New("corrupt index file") // Encoding Encoding Encoding = binary.BigEndian )
var ( // ErrorTokenNotFound ErrorTokenNotFound ErrorTokenNotFound = errors.New("token not found") )
Functions ¶
func GetFileList ¶
GetFileList GetFileList
func UnmarshalMessage ¶
UnmarshalMessage UnmarshalMessage
Types ¶
type Conversation ¶
type Conversation struct { UID string // User UID (user who belongs to the most recent session) ChannelID string // Conversation channel ChannelType uint8 UnreadCount int // Number of unread messages Timestamp int64 // Last session timestamp (10 digits) LastMsgSeq uint32 // Sequence number of the last message LastClientMsgNo string // Last message client number LastMsgID int64 // Last message ID Version int64 // Data version }
Conversation Conversation
func (*Conversation) String ¶
func (c *Conversation) String() string
type DB ¶
type DB interface { Open() error Close() error // SaveMetaData Application index of storage raft SaveMetaData(appliIndex uint64) error // GetMetaData Get Application index GetMetaData() (uint64, error) // GetUserToken 获取用户指定设备的token GetUserToken(uid string, deviceFlag lmproto.DeviceFlag) (token string, level lmproto.DeviceLevel, err error) // UpdateUserToken 更新用户指定设备的token UpdateUserToken(uid string, deviceFlag lmproto.DeviceFlag, deviceLevel lmproto.DeviceLevel, token string) error AddOrUpdateConversations(uid string, conversations []*Conversation) error GetConversations(uid string) ([]*Conversation, error) // 添加节点inflight数据 AddNodeInFlightData(data []*NodeInFlightDataModel) error // 获取投递给节点的inflight数据 GetNodeInFlightData() ([]*NodeInFlightDataModel, error) ClearNodeInFlightData() error // BackupSlots 备份slots BackupSlots(slots []byte, w io.Writer) error // RecoverSlotBackup 恢复备份 RecoverSlotBackup(reader io.Reader) error IMessageDB IChannelDB IDenyAndAllowlistStore ISnapshot }
DB DB
type FileDB ¶
FileDB FileDB
func (*FileDB) AddAllowlist ¶
AddAllowlist AddAllowlist
func (*FileDB) AddDenylist ¶
AddDenylist AddDenylist
func (*FileDB) AddNodeInFlightData ¶
func (f *FileDB) AddNodeInFlightData(data []*NodeInFlightDataModel) error
AddNodeInFlightData 添加节点inflight数据
func (*FileDB) AddOrUpdateChannel ¶
func (f *FileDB) AddOrUpdateChannel(channelID string, channelType uint8, data map[string]interface{}) error
AddOrUpdateChannel AddOrUpdateChannel
func (*FileDB) AddOrUpdateConversations ¶
func (f *FileDB) AddOrUpdateConversations(uid string, conversations []*Conversation) error
AddOrUpdateConversations AddOrUpdateConversations
func (*FileDB) AddSubscribers ¶
AddSubscribers AddSubscribers
func (*FileDB) AppendMessage ¶
AppendMessage AppendMessage
func (*FileDB) AppendMessageOfNotifyQueue ¶
AppendMessageOfNotifyQueue AppendMessageOfNotifyQueue
func (*FileDB) AppendMessageOfUser ¶
AppendMessageOfUser Append message to user
func (*FileDB) BackupSlots ¶
BackupSlots BackupSlots
func (*FileDB) ClearNodeInFlightData ¶
ClearNodeInFlightData ClearNodeInFlightData
func (*FileDB) DeleteChannel ¶
DeleteChannel DeleteChannel
func (*FileDB) DeleteChannelAndClearMessages ¶
DeleteChannelAndClearMessages DeleteChannelAndClearMessages
func (*FileDB) DeleteMessages ¶
DeleteMessages DeleteMessages
func (*FileDB) ExistChannel ¶
ExistChannel ExistChannel
func (*FileDB) GetAllowlist ¶
GetAllowlist GetAllowlist
func (*FileDB) GetChannel ¶
GetChannel GetChannel
func (*FileDB) GetConversations ¶
func (f *FileDB) GetConversations(uid string) ([]*Conversation, error)
GetConversations GetConversations
func (*FileDB) GetDenylist ¶
GetDenylist GetDenylist
func (*FileDB) GetLastMessages ¶
func (f *FileDB) GetLastMessages(channelID string, channelType uint8, endOffset uint32, limit uint64) ([]*Message, error)
GetLastMessages 获取最新的消息
func (*FileDB) GetMessage ¶
func (f *FileDB) GetMessage(channelID string, channelType uint8, messageSeq uint32) (*Message, error)
GetMessage GetMessage
func (*FileDB) GetMessageOfUserCursor ¶
GetMessageOfUserCursor GetMessageOfUserCursor
func (*FileDB) GetMessages ¶
func (f *FileDB) GetMessages(channelID string, channelType uint8, offset uint32, limit uint64) ([]*Message, error)
GetMessages 获取消息
func (*FileDB) GetMessagesOfNotifyQueue ¶
GetMessagesOfNotifyQueue GetMessagesOfNotifyQueue
func (*FileDB) GetMessagesOfUser ¶
GetMessagesOfUser 获取用户队列内的消息
func (*FileDB) GetNextMessageSeq ¶
GetNextMessageSeq GetNextMessageSeq
func (*FileDB) GetNodeInFlightData ¶
func (f *FileDB) GetNodeInFlightData() ([]*NodeInFlightDataModel, error)
GetNodeInFlightData 获取投递给节点的inflight数据
func (*FileDB) GetSubscribers ¶
GetSubscribers GetSubscribers
func (*FileDB) GetUserNextMessageSeq ¶
GetUserNextMessageSeq GetUserNextMessageSeq
func (*FileDB) GetUserToken ¶
func (f *FileDB) GetUserToken(uid string, deviceFlag lmproto.DeviceFlag) (string, lmproto.DeviceLevel, error)
GetUserToken GetUserToken
func (*FileDB) PrepareSnapshot ¶
PrepareSnapshot PrepareSnapshot
func (*FileDB) RecoverSlotBackup ¶
RecoverSlotBackup 恢复备份
func (*FileDB) RemoveAllAllowlist ¶
RemoveAllAllowlist RemoveAllAllowlist
func (*FileDB) RemoveAllDenylist ¶
RemoveAllDenylist RemoveAllDenylist
func (*FileDB) RemoveAllSubscriber ¶
RemoveAllSubscriber RemoveAllSubscriber
func (*FileDB) RemoveAllowlist ¶
RemoveAllowlist RemoveAllowlist
func (*FileDB) RemoveDenylist ¶
RemoveDenylist RemoveDenylist
func (*FileDB) RemoveMessagesOfNotifyQueue ¶
RemoveMessagesOfNotifyQueue RemoveMessagesOfNotifyQueue
func (*FileDB) RemoveSubscribers ¶
RemoveSubscribers RemoveSubscribers
func (*FileDB) SaveMetaData ¶
SaveMetaData SaveMetaData
func (*FileDB) SaveSnapshot ¶
SaveSnapshot SaveSnapshot
func (*FileDB) UpdateMessageOfUserCursorIfNeed ¶
UpdateMessageOfUserCursorIfNeed UpdateMessageOfUserCursorIfNeed
func (*FileDB) UpdateUserToken ¶
func (f *FileDB) UpdateUserToken(uid string, deviceFlag lmproto.DeviceFlag, deviceLevel lmproto.DeviceLevel, token string) error
UpdateUserToken UpdateUserToken
type IChannelDB ¶
type IChannelDB interface { // GetNextMessageSeq 获取频道的下一个序号 GetNextMessageSeq(channelID string, channelType uint8) (uint32, error) // AddOrUpdateChannel 添加或者更新频道 AddOrUpdateChannel(channelID string, channelType uint8, data map[string]interface{}) error // GetChannel 获取频道数据 GetChannel(channelID string, channelType uint8) (map[string]interface{}, error) // DeleteChannel 删除频道 DeleteChannel(channelID string, channelType uint8) error DeleteChannelAndClearMessages(channelID string, channelType uint8) error // ExistChannel 是否存在指定的频道 ExistChannel(channelID string, channelType uint8) (bool, error) // AddSubscribers 添加订阅者 AddSubscribers(channelID string, channelType uint8, uids []string) error // RemoveSubscribers 移除指定频道内指定uid的订阅者 RemoveSubscribers(channelID string, channelType uint8, uids []string) error // GetSubscribers 获取订阅者列表 GetSubscribers(channelID string, channelType uint8) ([]string, error) RemoveAllSubscriber(channelID string, channelType uint8) error }
IChannelDB IChannelDB
type IDenyAndAllowlistStore ¶
type IDenyAndAllowlistStore interface { // AddDenylist 添加频道黑名单 AddDenylist(channelID string, channelType uint8, uids []string) error // GetDenylist 获取频道黑名单列表 GetDenylist(channelID string, channelType uint8) ([]string, error) // RemoveDenylist 移除频道内指定用户的黑名单 RemoveDenylist(channelID string, channelType uint8, uids []string) error // RemoveAllDenylist 移除指定频道的所有黑名单 RemoveAllDenylist(channelID string, channelType uint8) error // GetAllowlist 获取白名单 GetAllowlist(channelID string, channelType uint8) ([]string, error) // AddAllowlist 添加白名单 AddAllowlist(channelID string, channelType uint8, uids []string) error // RemoveAllowlist 移除白名单 RemoveAllowlist(channelID string, channelType uint8, uids []string) error // RemoveAllAllowlist 移除指定频道的所有白名单 RemoveAllAllowlist(channelID string, channelType uint8) error }
IDenyAndAllowlistStore IDenyAndAllowlistStore
type ILog ¶
type ILog interface { GetAppliIndex() uint64 Offset() int64 Encode() ([]byte, error) Decode(data []byte) error }
ILog ILog
type IMessageDB ¶
type IMessageDB interface { GetUserNextMessageSeq(uid string) (uint32, error) // AppendMessage 追加消息到频道队列 n 为追加的实际字节数 AppendMessage(m *Message) (n int, err error) // AppendMessageOfUser 追加消息到用户队列 AppendMessageOfUser(m *Message) (int, error) // UpdateMessageOfUserCursorIfNeed 更新用户消息队列的游标,用户读到的位置 UpdateMessageOfUserCursorIfNeed(uid string, offset uint32) error // GetMessageOfUserCursor(uid string) (uint32, error) // AppendMessageOfNotifyQueue 追加消息到通知队列 AppendMessageOfNotifyQueue(m *Message) error GetMessagesOfNotifyQueue(count int) ([]*Message, error) // RemoveMessagesOfNotifyQueue 从通知队列里移除消息 RemoveMessagesOfNotifyQueue(messageIDs []int64) error // GetMessages 获取消息 GetMessages(channelID string, channelType uint8, offset uint32, limit uint64) ([]*Message, error) GetLastMessages(channelID string, channelType uint8, endOffset uint32, limit uint64) ([]*Message, error) GetMessagesOfUser(uid string, offset uint32, limit uint64) ([]*Message, error) GetMessage(channelID string, channelType uint8, messageSeq uint32) (*Message, error) DeleteMessages(channelID string, channelType uint8) error }
IMessageDB IMessageDB
type ISnapshot ¶
type ISnapshot interface { // PrepareSnapshot PrepareSnapshot PrepareSnapshot() (*Snapshot, error) // SaveSnapshot SaveSnapshot SaveSnapshot(snapshot *Snapshot, w io.Writer) error }
ISnapshot ISnapshot
type Index ¶
Index Index
func (*Index) Lookup ¶
func (idx *Index) Lookup(targetOffset int64) (OffsetPosition, error)
Lookup Find the largest offset less than or equal to the given targetOffset and return a pair holding this offset and its corresponding physical file position
func (*Index) TruncateEntries ¶
TruncateEntries TruncateEntries
type Message ¶
type Message struct { Header uint8 AppliIndex uint64 Version uint8 Setting uint8 MessageID int64 // 服务端的消息ID(全局唯一) MessageSeq uint32 // 消息序列号 (用户唯一,有序递增) ClientMsgNo string // 客户端唯一标示 Timestamp int32 // 服务器消息时间戳(10位,到秒) FromUID string // 发送者UID QueueUID string // 放入到那个用户的队列内 如果QueueUID有值说明是写模式 ChannelID string // 频道ID ChannelType uint8 // 频道类型 Payload []byte // 消息内容 }
Message Message
func (*Message) GetAppliIndex ¶
GetAppliIndex GetAppliIndex
type NodeInFlightDataModel ¶
type NodeInFlightDataModel struct { No string // 请求唯一编号 NodeID int32 // 接受消息的节点ID Req *rpc.ForwardRecvPacketReq }
NodeInFlightDataModel NodeInFlightDataModel
type OffsetPosition ¶
OffsetPosition OffsetPosition
type Segment ¶
type Segment struct { limlog.Log // logMMap mmapgo.MMap sync.Mutex // contains filtered or unexported fields }
Segment Segment
func (*Segment) SanityCheck ¶
SanityCheck Sanity check