Documentation
¶
Index ¶
- Constants
- Variables
- func GenerateError(key int, a ...interface{}) *mft.Error
- func GenerateErrorE(key int, err error, a ...interface{}) *mft.Error
- func SubscribeCopyUnique(src Queue, dst Queue, userSrc cn.CapUser, userDst cn.CapUser, ...) func(ctx context.Context) (isEmpty bool, err *mft.Error)
- type Message
- type MessageJsonBody
- type MessageOnlyMeta
- type MessageWithMeta
- type Queue
- type SimpleQueue
- func (q *SimpleQueue) Add(ctx context.Context, user cn.CapUser, message []byte, externalID int64, ...) (id int64, err *mft.Error)
- func (q *SimpleQueue) AddList(ctx context.Context, user cn.CapUser, messages []Message, saveMode cn.SaveMode) (ids []int64, err *mft.Error)
- func (q *SimpleQueue) AddUnique(ctx context.Context, user cn.CapUser, message []byte, externalID int64, ...) (id int64, err *mft.Error)
- func (q *SimpleQueue) AddUniqueList(ctx context.Context, user cn.CapUser, messages []Message, saveMode cn.SaveMode) (ids []int64, err *mft.Error)
- func (q *SimpleQueue) DeleteBlocks(ctx context.Context, user cn.CapUser, blocksCount int) (err *mft.Error)
- func (q *SimpleQueue) Get(ctx context.Context, user cn.CapUser, idStart int64, cntLimit int) (messages []*MessageWithMeta, err *mft.Error)
- func (q *SimpleQueue) GetSegment(ctx context.Context, user cn.CapUser, idStart int64, cntLimit int, ...) (messages []*MessageWithMeta, lastId int64, err *mft.Error)
- func (q *SimpleQueue) Mutex() *mfs.PMutex
- func (q *SimpleQueue) MutexBlockSaveWait() *mfs.PMutex
- func (q *SimpleQueue) MutexFileSave() *mfs.PMutex
- func (q *SimpleQueue) Save(ctx context.Context, user cn.CapUser) (err *mft.Error)
- func (q *SimpleQueue) SaveAll(ctx context.Context, user cn.CapUser) (err *mft.Error)
- func (q *SimpleQueue) SaveSubscribers(ctx context.Context, user cn.CapUser) (err *mft.Error)
- func (q *SimpleQueue) SetDelete(ctx context.Context, user cn.CapUser, ...) (err *mft.Error)
- func (q *SimpleQueue) SetMarks(ctx context.Context, user cn.CapUser, ...) (err *mft.Error)
- func (q *SimpleQueue) SetMaxExtID(source string, extID int64)
- func (q *SimpleQueue) SetUnload(ctx context.Context, user cn.CapUser, ...) (err *mft.Error)
- func (q *SimpleQueue) SubscriberAddReplicaMember(ctx context.Context, user cn.CapUser, subscriber string) (err *mft.Error)
- func (q *SimpleQueue) SubscriberGetLastRead(ctx context.Context, user cn.CapUser, subscriber string) (id int64, err *mft.Error)
- func (q *SimpleQueue) SubscriberGetReplicaCount(ctx context.Context, user cn.CapUser, id int64) (cnt int, err *mft.Error)
- func (q *SimpleQueue) SubscriberRemoveReplicaMember(ctx context.Context, user cn.CapUser, subscriber string) (err *mft.Error)
- func (q *SimpleQueue) SubscriberSetLastRead(ctx context.Context, user cn.CapUser, subscriber string, id int64, ...) (err *mft.Error)
- func (q *SimpleQueue) UpdateMarks(ctx context.Context, user cn.CapUser, blocksCount int) (err *mft.Error)
- type SimpleQueueBlock
- func (block *SimpleQueueBlock) Mutex() *mfs.PMutex
- func (block *SimpleQueueBlock) MutexFileSave() *mfs.PMutex
- func (block *SimpleQueueBlock) Save(ctx context.Context, q *SimpleQueue) (err *mft.Error)
- func (block *SimpleQueueBlock) Unload(ctx context.Context, q *SimpleQueue) (isNotSave bool, err *mft.Error)
- type SimpleQueueMessage
- type SimpleQueueSubscriberInfo
- type SimpleQueueSubscribers
Constants ¶
const BlockPostfixFileName = ".json"
BlockPostfixFileName - postfix file name with queue block
const BlockPrefixFileName = "bl_"
BlockPrefixFileName - prefix file name with queue block
const MetaDataFileName = "q.json"
MetaDataFileName - file name with metadata queue info
const SubscribersFileName = "subscr.json"
SubscribersFileName - file name with subscribers queue info
Variables ¶
var Errors map[int]string = map[int]string{
10010000: "SimpleQueue.Add: read lock queue fail wait",
10010001: "SimpleQueueBlock.add: lock queue block fail wait",
10010002: "SimpleQueueBlock.canAppend: read lock queue block fail wait",
10010003: "SimpleQueue.checkAndAddCurrentBlockForWrite: promote queue fail wait",
10010004: "SimpleQueue.Add: mxBlockSaveWait lock queue a fail wait",
10010005: "SimpleQueue.Add: chWaitSaveMeta fail wait",
10010006: "SimpleQueue.Add: chWaitBlockSave fail wait",
10010007: "SimpleQueueBlock.add: block IsUnload !!!!",
10010008: "SimpleQueueBlock.add: externat time in future ext time: %v now:%v",
10010009: "SimpleQueue.Add: segment %v is out of valid segments",
10010010: "SimpleQueue.Add: save mode %v is not allowed",
10011000: "SimpleQueue.getBlockForNext: block RLock fail wait",
10011001: "SimpleQueueBlock.getItemsAfter: block RLock fail wait",
10011002: "SimpleQueue.Get: queue RLock fail wait",
10012000: "SimpleQueue.Save: queue Lock FileSave mutex fail wait",
10012001: "SimpleQueue.Save: queue RLock fail wait",
10012002: "SimpleQueue.Save: queue marshal fail",
10012003: "SimpleQueue.Save: file %v save fail",
10012004: "SimpleQueue.Save: queue Lock fail wait",
10013000: "SimpleQueueBlock.Save: block Lock FileSave mutex fail wait",
10013001: "SimpleQueueBlock.Save: block RLock fail wait",
10013002: "SimpleQueueBlock.Save: block.data marshal fail",
10013003: "SimpleQueueBlock.Save: file %v save fail",
10013004: "SimpleQueueBlock.Save: block Lock fail wait",
10014000: "SimpleQueue.SaveAll: block Lock BlockSaveWait mutex fail wait",
10015000: "SimpleQueueBlock.deleteBlock: block Lock FileSave mutex fail wait",
10015001: "SimpleQueueBlock.deleteBlock: block RLock fail wait",
10015002: "SimpleQueueBlock.deleteBlock: block Promote to Lock fail wait",
10015003: "SimpleQueue.deleteBlock: queue Lock mutex fail wait",
10016000: "SimpleQueue.getStorageLock: queue RLock fail wait",
10017000: "SimpleQueueBlock.clearOldStorageBlock: block Lock FileSave mutex fail wait",
10017001: "SimpleQueue.clearOldStorageBlock: queue Lock mutex fail wait",
10018000: "SimpleQueueBlock.move: block Lock FileSave mutex fail wait",
10018001: "SimpleQueueBlock.move: block RLock fail wait",
10018002: "SimpleQueueBlock.move: block.data marshal fail",
10018003: "SimpleQueueBlock.move: file %v save fail",
10018004: "SimpleQueueBlock.move: queue Lock fail wait",
10018005: "SimpleQueueBlock.move: block Promote to Lock fail wait",
10019000: "SimpleQueueBlock.unload: block Lock FileSave mutex fail wait",
10019001: "SimpleQueueBlock.unload: block Lock fail wait",
10020000: "SimpleQueueBlock.load: block Lock FileSave mutex fail wait",
10020001: "SimpleQueueBlock.load: block Promote to Lock fail wait",
10020002: "SimpleQueueBlock.load: json Unmarchal Fail",
10020003: "SimpleQueueBlock.load: load from storage Fail file name: %v, mark:%v",
10021000: "SimpleQueue.SetUnload: queue RLock fail wait",
10022000: "SimpleQueueBlock.setNewStorage: block Lock FileSave mutex fail wait",
10022001: "SimpleQueueBlock.setNewStorage: queue Lock fail wait",
10022002: "SimpleQueueBlock.setNewStorage: block Lock fail wait",
10023000: "SimpleQueueBlock.setNeedDelete: block Lock FileSave mutex fail wait",
10023001: "SimpleQueueBlock.setNeedDelete: queue Lock fail wait",
10023002: "SimpleQueueBlock.setNeedDelete: block Lock fail wait",
10024000: "SimpleQueue.SetMarks: queue RLock fail wait",
10025000: "SimpleQueue.SetDelete: queue RLock fail wait",
10026000: "SimpleQueue.UpdateMarks: queue RLock fail wait",
10026001: "SimpleQueue.UpdateMarks: block RLock mutex fail wait",
10027000: "SimpleQueue.searchMaxExtID: queue RLock fail wait",
10027001: "SimpleQueue.searchMaxExtID: block RLock fail wait",
10028000: "SimpleQueue.searchExtID: queue RLock fail wait",
10028001: "SimpleQueue.searchExtID: block RLock fail wait",
10029000: "SimpleQueue.AddUnique: externalID should be != 0",
10029001: "SimpleQueue.AddUnique: queue Lock by source fail wait",
10030000: "LoadSimpleQueue() (*SimpleQueue): unmarshal queue info error",
10030001: "LoadSimpleQueue() (*SimpleQueue): unmarshal subscribers info error",
10031000: "SimpleQueue.SaveSubscribers: queue subscribers Lock FileSave mutex fail wait",
10031001: "SimpleQueue.SaveSubscribers: queue subscribers RLock fail wait",
10031002: "SimpleQueue.SaveSubscribers: queue subscribers marshal fail",
10031003: "SimpleQueue.SaveSubscribers: file %v save fail",
10031004: "SimpleQueue.SaveSubscribers: queue Lock fail wait",
10032000: "SimpleQueue.SubscriberSetLastRead: queue subscribers Lock fail wait",
10032001: "SimpleQueue.SubscriberSetLastRead: chWait fail wait",
10032002: "SimpleQueue.SubscriberSetLastRead: save mode %v is not allowed",
10033000: "SimpleQueue.SubscriberGetLastRead: queue subscribers RLock fail wait",
10033100: "SimpleQueue.SubscriberAddReplicaMember: queue subscribers Lock fail wait",
10033101: "SimpleQueue.SubscriberAddReplicaMember: save fail",
10033200: "SimpleQueue.SubscriberRemoveReplicaMember: queue subscribers Lock fail wait",
10033201: "SimpleQueue.SubscriberRemoveReplicaMember: save fail",
10033300: "SimpleQueue.SubscriberGetReplicaCount: queue subscribers RLock fail wait",
}
Errors codes and description
Functions ¶
func GenerateErrorE ¶
GenerateErrorE -
func SubscribeCopyUnique ¶
func SubscribeCopyUnique(src Queue, dst Queue, userSrc cn.CapUser, userDst cn.CapUser, saveModeSrc cn.SaveMode, saveModeDst cn.SaveMode, subscriberName string, cntLimit int, doSaveDst bool, segments *segment.Segments, ) func(ctx context.Context) (isEmpty bool, err *mft.Error)
SubscribeCopyUnique subscribe on to queue and copy (addUniqueList) to destination
Types ¶
type Message ¶
type Message struct {
ExternalID int64 `json:"eid,omitempty"`
ExternalDt int64 `json:"s_dt,omitempty"`
Message []byte `json:"msg"`
Source string `json:"src,omitempty"`
Segment int64 `json:"sg,omitempty"`
}
Message one message
func MJBToMessageList ¶
func MJBToMessageList(msgs []MessageJsonBody) (out []Message)
type MessageJsonBody ¶
type MessageJsonBody struct {
ID int64 `json:"id"`
Dt time.Time `json:"dt"`
ExternalID int64 `json:"external_id,omitempty"`
ExternalDt int64 `json:"message_ts,omitempty"`
Message json.RawMessage `json:"message"`
Source string `json:"source,omitempty"`
Segment int64 `json:"segment,omitempty"`
}
MessageJsonBody with json body
func MWMToMessageJBList ¶
func MWMToMessageJBList(msgs []*MessageWithMeta) (out []MessageJsonBody)
func (*MessageJsonBody) ToMessage ¶
func (msg *MessageJsonBody) ToMessage() Message
type MessageOnlyMeta ¶
type MessageOnlyMeta struct {
ID int64 `json:"id"`
// ExternalID - source id, when 0 then equal ID
ExternalID int64 `json:"eid,omitempty"`
ExternalDt int64 `json:"s_dt,omitempty"`
Dt time.Time `json:"dt"`
Source string `json:"src,omitempty"`
IsSaved bool `json:"is_saved"`
Segment int64 `json:"sg,omitempty"`
}
MessageOnlyMeta one message only meta
type MessageWithMeta ¶
type MessageWithMeta struct {
ID int64 `json:"id"`
// ExternalID - source id, when 0 then equal ID
ExternalID int64 `json:"eid,omitempty"`
ExternalDt int64 `json:"s_dt,omitempty"`
Dt time.Time `json:"dt"`
Message []byte `json:"msg"`
Source string `json:"src,omitempty"`
IsSaved bool `json:"is_saved"`
Segment int64 `json:"sg,omitempty"`
}
MessageWithMeta one message with meta
func (*MessageWithMeta) CopyOM ¶
func (msg *MessageWithMeta) CopyOM() *MessageOnlyMeta
CopyOM copy message to QueueMessageOnlyMeta
func (*MessageWithMeta) ToMessage ¶
func (msg *MessageWithMeta) ToMessage() Message
CopyOM copy message to QueueMessageOnlyMeta
func (*MessageWithMeta) ToMessageJB ¶
func (msg *MessageWithMeta) ToMessageJB() MessageJsonBody
type Queue ¶
type Queue interface {
Add(ctx context.Context, user cn.CapUser, message []byte,
externalID int64, externalDt int64, source string, segment int64,
saveMode cn.SaveMode) (id int64, err *mft.Error)
AddList(ctx context.Context, user cn.CapUser, messages []Message,
saveMode cn.SaveMode) (ids []int64, err *mft.Error)
// Get - gets messages from queue not more then cntLimit count and id more idStart
// returns messages == nil when no elements
Get(ctx context.Context, user cn.CapUser, idStart int64, cntLimit int) (messages []*MessageWithMeta, err *mft.Error)
// GetSegment - gets messages from queue not more then cntLimit count and id more idStart
// returns messages == nil when no elements
// message should be in segment
// lastId last readed message ID from queue
GetSegment(ctx context.Context, user cn.CapUser, idStart int64, cntLimit int,
segments *segment.Segments,
) (messages []*MessageWithMeta, lastId int64, err *mft.Error)
// SaveAll save all waiting for save block and metadata and else
SaveAll(ctx context.Context, user cn.CapUser) (err *mft.Error)
// AddUnique message to queue
// externalDt is unix time
// externalID is source id (should be != 0 !!!!)
AddUnique(ctx context.Context, user cn.CapUser, message []byte,
externalID int64, externalDt int64, source string, segment int64,
saveMode cn.SaveMode) (id int64, err *mft.Error)
AddUniqueList(ctx context.Context, user cn.CapUser, messages []Message,
saveMode cn.SaveMode) (ids []int64, err *mft.Error)
// SubscriberSetLastRead - set last read info
// if id == 0 remove subscribe
SubscriberSetLastRead(ctx context.Context, user cn.CapUser,
subscriber string, id int64,
saveMode cn.SaveMode) (err *mft.Error)
// SubscriberGetLastRead - get last read info
SubscriberGetLastRead(ctx context.Context, user cn.CapUser, subscriber string) (id int64, err *mft.Error)
// SubscriberAddReplicaMember - add member to replica. Replica is group to control replication
SubscriberAddReplicaMember(ctx context.Context, user cn.CapUser, subscriber string) (err *mft.Error)
// SubscriberRemoveReplicaMember - remove member from replica. Replica is group to control replication
SubscriberRemoveReplicaMember(ctx context.Context, user cn.CapUser, subscriber string) (err *mft.Error)
// SubscriberGetReplicaCount - get how many members from replica get message. Replica is group to control replication
SubscriberGetReplicaCount(ctx context.Context, user cn.CapUser, id int64) (cnt int, err *mft.Error)
}
Queue - queue of messages
type SimpleQueue ¶
type SimpleQueue struct {
Blocks []*SimpleQueueBlock `json:"blocks"`
// MetaStorage - storage for metadata (block info)
MetaStorage storage.Storage `json:"-"`
// SubscriberStorage - storage for subscriber info
SubscriberStorage storage.Storage `json:"-"`
// MarkerBlockDataStorage - storage for block data with marker
// case nil use MetaStorage
MarkerBlockDataStorage map[string]storage.Storage `json:"-"`
IDGenerator *mft.G `json:"-"`
CntLimit int `json:"cnt_limit,omitempty"`
TimeLimit time.Duration `json:"time_limit,omitempty"`
LenLimit int `json:"len_limit,omitempty"`
ChangesRv int64 `json:"-"`
SaveRv int64 `json:"-"`
SaveWait []chan bool `json:"-"`
SaveBlocks map[int64]*SimpleQueueBlock `json:"-"`
Subscribers *SimpleQueueSubscribers `json:"-"`
Source string `json:"-"`
Segments *segment.Segments `json:"segments,omitempty"`
DefaultSaveMode cn.SaveMode `json:"default_save_mod,omitempty"`
UseDefaultSaveModeForce bool `json:"use_default_save_mod_force,omitempty"`
// contains filtered or unexported fields
}
SimpleQueue - queue MVP
func CreateSimpleQueue ¶
func CreateSimpleQueue(cntLimit int, timeLimit time.Duration, lenLimit int, metaStorage storage.Storage, subscriberStorage storage.Storage, markerBlockDataStorage map[string]storage.Storage, idGenerator *mft.G) *SimpleQueue
CreateSimpleQueue creates SimpleQueue MetaStorage - storage for metadata (block info) MarkerBlockDataStorage - storage for block data with marker case nil use MetaStorage
func LoadSimpleQueue ¶
func LoadSimpleQueue(ctx context.Context, metaStorage storage.Storage, subscriberStorage storage.Storage, markerBlockDataStorage map[string]storage.Storage, idGenerator *mft.G) (q *SimpleQueue, err *mft.Error)
LoadSimpleQueue load queue from storage
func (*SimpleQueue) Add ¶
func (q *SimpleQueue) Add(ctx context.Context, user cn.CapUser, message []byte, externalID int64, externalDt int64, source string, segment int64, saveMode cn.SaveMode) (id int64, err *mft.Error)
Add message to queue externalDt is unix time externalID is source id (should be != 0 if set 0 - not set)
func (*SimpleQueue) AddUnique ¶
func (q *SimpleQueue) AddUnique(ctx context.Context, user cn.CapUser, message []byte, externalID int64, externalDt int64, source string, segment int64, saveMode cn.SaveMode) (id int64, err *mft.Error)
AddUnique message to queue externalDt is unix time externalID is source id (should be != 0 !!!!)
func (*SimpleQueue) AddUniqueList ¶
func (*SimpleQueue) DeleteBlocks ¶
func (q *SimpleQueue) DeleteBlocks(ctx context.Context, user cn.CapUser, blocksCount int) (err *mft.Error)
DeleteBlocks dete blocks (NeedDelete true) blocksCount = 0 - unlimited
func (*SimpleQueue) Get ¶
func (q *SimpleQueue) Get(ctx context.Context, user cn.CapUser, idStart int64, cntLimit int) (messages []*MessageWithMeta, err *mft.Error)
Get - gets messages from queue not more then cntLimit count and id more idStart returns messages == nil when no elements
func (*SimpleQueue) GetSegment ¶
func (q *SimpleQueue) GetSegment(ctx context.Context, user cn.CapUser, idStart int64, cntLimit int, segments *segment.Segments, ) (messages []*MessageWithMeta, lastId int64, err *mft.Error)
GetSegment - gets messages from queue not more then cntLimit count and id more idStart returns messages == nil when no elements message should be in segment lastId last readed message ID from queue
func (*SimpleQueue) MutexBlockSaveWait ¶
func (q *SimpleQueue) MutexBlockSaveWait() *mfs.PMutex
MutexBlockSaveWait - returns Mutex for block save Wait
func (*SimpleQueue) MutexFileSave ¶
func (q *SimpleQueue) MutexFileSave() *mfs.PMutex
MutexFileSave - returns Mutex for FileSave
func (*SimpleQueue) Save ¶
Save save meta info of queue When MetaStorage == nil returns nil When SaveRv == ChangesRv do nothing and returns nil
func (*SimpleQueue) SaveSubscribers ¶
SaveSubscribers save subscribers info of queue When MetaStorage == nil returns nil When SaveRv == ChangesRv do nothing and returns nil
func (*SimpleQueue) SetDelete ¶
func (q *SimpleQueue) SetDelete(ctx context.Context, user cn.CapUser, setNeedDelete func(ctx context.Context, i int, len int, q *SimpleQueue, block *SimpleQueueBlock) (needDelete bool, err *mft.Error)) (err *mft.Error)
SetDelete - find and set blocks to delete it needs to save q (q.save(ctx)) after done find first false (dunc returns false) block and all balcks befor false are need to delete
func (*SimpleQueue) SetMarks ¶
func (q *SimpleQueue) SetMarks(ctx context.Context, user cn.CapUser, setBlockMark func(ctx context.Context, i int, len int, q *SimpleQueue, block *SimpleQueueBlock) (needSetMark bool, nextMark string, err *mft.Error)) (err *mft.Error)
SetMarks - find and set blocks marks it needs to save q (q.save(ctx)) after done
func (*SimpleQueue) SetMaxExtID ¶
func (q *SimpleQueue) SetMaxExtID(source string, extID int64)
SetMaxExtID - set max external id
func (*SimpleQueue) SetUnload ¶
func (q *SimpleQueue) SetUnload(ctx context.Context, user cn.CapUser, setBlockUnload func(ctx context.Context, i int, len int, q *SimpleQueue, block *SimpleQueueBlock) (needUnload bool, err *mft.Error)) (err *mft.Error)
SetUnload - find and set blocks as unload
func (*SimpleQueue) SubscriberAddReplicaMember ¶
func (q *SimpleQueue) SubscriberAddReplicaMember(ctx context.Context, user cn.CapUser, subscriber string) (err *mft.Error)
SubscriberAddReplicaMember - add replication subscriber member
func (*SimpleQueue) SubscriberGetLastRead ¶
func (q *SimpleQueue) SubscriberGetLastRead(ctx context.Context, user cn.CapUser, subscriber string) (id int64, err *mft.Error)
SubscriberGetLastRead - get last read info
func (*SimpleQueue) SubscriberGetReplicaCount ¶
func (q *SimpleQueue) SubscriberGetReplicaCount(ctx context.Context, user cn.CapUser, id int64) (cnt int, err *mft.Error)
SubscriberGetReplicaCount - get count of replication subscriber member
func (*SimpleQueue) SubscriberRemoveReplicaMember ¶
func (q *SimpleQueue) SubscriberRemoveReplicaMember(ctx context.Context, user cn.CapUser, subscriber string) (err *mft.Error)
SubscriberRemoveReplicaMember - remove replication subscriber member
func (*SimpleQueue) SubscriberSetLastRead ¶
func (q *SimpleQueue) SubscriberSetLastRead(ctx context.Context, user cn.CapUser, subscriber string, id int64, saveMode cn.SaveMode) (err *mft.Error)
SubscriberSetLastRead - set last read info if id == 0 remove subscribe
func (*SimpleQueue) UpdateMarks ¶
func (q *SimpleQueue) UpdateMarks(ctx context.Context, user cn.CapUser, blocksCount int) (err *mft.Error)
UpdateMarks move from storage to storage and clear OldStorages it needs to save q (q.save(ctx)) after done blocksCount = 0 - unlimited
type SimpleQueueBlock ¶
type SimpleQueueBlock struct {
ID int64 `json:"id"`
Dt time.Time `json:"dt"`
Mark string `json:"mark"`
RemoveMarks []string `json:"rm_marks,omitempty"`
NextMark string `json:"next_mark"`
NeedDelete bool `json:"need_delete"`
Len int `json:"len"`
Data []*SimpleQueueMessage `json:"-"`
ChangesRv int64 `json:"-"`
SaveRv int64 `json:"-"`
IsUnload bool `json:"-"`
SaveWait []chan bool `json:"-"`
// May use without lock
LastGet time.Time `json:"-"`
// contains filtered or unexported fields
}
SimpleQueueBlock block with data
func (*SimpleQueueBlock) Mutex ¶
func (block *SimpleQueueBlock) Mutex() *mfs.PMutex
Mutex - returns Mutex
func (*SimpleQueueBlock) MutexFileSave ¶
func (block *SimpleQueueBlock) MutexFileSave() *mfs.PMutex
MutexFileSave - returns Mutex for FileSave
func (*SimpleQueueBlock) Save ¶
func (block *SimpleQueueBlock) Save(ctx context.Context, q *SimpleQueue) (err *mft.Error)
Save save block of queue When q.MetaStorage == nil returns nil When block.SaveRv == block.ChangesRv do nothing and returns nil
func (*SimpleQueueBlock) Unload ¶
func (block *SimpleQueueBlock) Unload(ctx context.Context, q *SimpleQueue) (isNotSave bool, err *mft.Error)
Unload - unload block from SimpleQueue
type SimpleQueueMessage ¶
type SimpleQueueMessage struct {
ID int64 `json:"id"`
// ExternalID, source id, when 0 then equal ID
ExternalID int64 `json:"eid,omitempty"`
ExternalDt int64 `json:"s_dt,omitempty"`
Dt time.Time `json:"dt"`
Message []byte `json:"msg,omitempty"`
Source string `json:"src,omitempty"`
Segment int64 `json:"sg,omitempty"`
}
SimpleQueueMessage one message
func (*SimpleQueueMessage) CopyWM ¶
func (msg *SimpleQueueMessage) CopyWM() *MessageWithMeta
CopyWM copy message to QueueMessageWithMeta
type SimpleQueueSubscriberInfo ¶
type SimpleQueueSubscriberInfo struct {
LastID int64 `json:"last_id"`
StartDt time.Time `json:"start_dt"`
LastDt time.Time `json:"last_dt"`
}
SimpleQueueSubscriberInfo info aboun one subscriber
type SimpleQueueSubscribers ¶
type SimpleQueueSubscribers struct {
ChangesRv int64 `json:"-"`
SaveRv int64 `json:"-"`
SaveWait []chan bool `json:"-"`
SubscribersInfo map[string]*SimpleQueueSubscriberInfo `json:"si,omitempty"`
ReplicaSubscribers map[string]struct{} `json:"rs,omitempty"`
// contains filtered or unexported fields
}
SimpleQueueSubscribers line subscribers info