queue

package
v0.0.0-...-7662171 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Aug 1, 2021 License: MIT Imports: 11 Imported by: 0

Documentation

Index

Constants

View Source
const BlockPostfixFileName = ".json"

BlockPostfixFileName - postfix file name with queue block

View Source
const BlockPrefixFileName = "bl_"

BlockPrefixFileName - prefix file name with queue block

View Source
const MetaDataFileName = "q.json"

MetaDataFileName - file name with metadata queue info

View Source
const SubscribersFileName = "subscr.json"

SubscribersFileName - file name with subscribers queue info

Variables

View Source
var Errors map[int]string = map[int]string{
	10010000: "SimpleQueue.Add: read lock queue fail wait",
	10010001: "SimpleQueueBlock.add: lock queue block fail wait",
	10010002: "SimpleQueueBlock.canAppend: read lock queue block fail wait",
	10010003: "SimpleQueue.checkAndAddCurrentBlockForWrite: promote queue fail wait",
	10010004: "SimpleQueue.Add: mxBlockSaveWait lock queue a fail wait",
	10010005: "SimpleQueue.Add: chWaitSaveMeta fail wait",
	10010006: "SimpleQueue.Add: chWaitBlockSave fail wait",
	10010007: "SimpleQueueBlock.add: block IsUnload !!!!",
	10010008: "SimpleQueueBlock.add: externat time in future ext time: %v now:%v",
	10010009: "SimpleQueue.Add: segment %v is out of valid segments",
	10010010: "SimpleQueue.Add: save mode %v is not allowed",

	10011000: "SimpleQueue.getBlockForNext: block RLock fail wait",
	10011001: "SimpleQueueBlock.getItemsAfter: block RLock fail wait",
	10011002: "SimpleQueue.Get: queue RLock fail wait",

	10012000: "SimpleQueue.Save: queue Lock FileSave mutex fail wait",
	10012001: "SimpleQueue.Save: queue RLock fail wait",
	10012002: "SimpleQueue.Save: queue marshal fail",
	10012003: "SimpleQueue.Save: file %v save fail",
	10012004: "SimpleQueue.Save: queue Lock fail wait",

	10013000: "SimpleQueueBlock.Save: block Lock FileSave mutex fail wait",
	10013001: "SimpleQueueBlock.Save: block RLock fail wait",
	10013002: "SimpleQueueBlock.Save: block.data marshal fail",
	10013003: "SimpleQueueBlock.Save: file %v save fail",
	10013004: "SimpleQueueBlock.Save: block Lock fail wait",

	10014000: "SimpleQueue.SaveAll: block Lock BlockSaveWait mutex fail wait",

	10015000: "SimpleQueueBlock.deleteBlock: block Lock FileSave mutex fail wait",
	10015001: "SimpleQueueBlock.deleteBlock: block RLock fail wait",
	10015002: "SimpleQueueBlock.deleteBlock: block Promote to Lock fail wait",
	10015003: "SimpleQueue.deleteBlock: queue Lock mutex fail wait",

	10016000: "SimpleQueue.getStorageLock: queue RLock fail wait",

	10017000: "SimpleQueueBlock.clearOldStorageBlock: block Lock FileSave mutex fail wait",
	10017001: "SimpleQueue.clearOldStorageBlock: queue Lock mutex fail wait",

	10018000: "SimpleQueueBlock.move: block Lock FileSave mutex fail wait",
	10018001: "SimpleQueueBlock.move: block RLock fail wait",
	10018002: "SimpleQueueBlock.move: block.data marshal fail",
	10018003: "SimpleQueueBlock.move: file %v save fail",
	10018004: "SimpleQueueBlock.move: queue Lock fail wait",
	10018005: "SimpleQueueBlock.move: block Promote to Lock fail wait",

	10019000: "SimpleQueueBlock.unload: block Lock FileSave mutex fail wait",
	10019001: "SimpleQueueBlock.unload: block Lock fail wait",

	10020000: "SimpleQueueBlock.load: block Lock FileSave mutex fail wait",
	10020001: "SimpleQueueBlock.load: block Promote to Lock fail wait",
	10020002: "SimpleQueueBlock.load: json Unmarchal Fail",
	10020003: "SimpleQueueBlock.load: load from storage Fail file name: %v, mark:%v",

	10021000: "SimpleQueue.SetUnload: queue RLock fail wait",

	10022000: "SimpleQueueBlock.setNewStorage: block Lock FileSave mutex fail wait",
	10022001: "SimpleQueueBlock.setNewStorage: queue Lock fail wait",
	10022002: "SimpleQueueBlock.setNewStorage: block Lock fail wait",

	10023000: "SimpleQueueBlock.setNeedDelete: block Lock FileSave mutex fail wait",
	10023001: "SimpleQueueBlock.setNeedDelete: queue Lock fail wait",
	10023002: "SimpleQueueBlock.setNeedDelete: block Lock fail wait",

	10024000: "SimpleQueue.SetMarks: queue RLock fail wait",

	10025000: "SimpleQueue.SetDelete: queue RLock fail wait",

	10026000: "SimpleQueue.UpdateMarks: queue RLock fail wait",
	10026001: "SimpleQueue.UpdateMarks: block RLock mutex fail wait",

	10027000: "SimpleQueue.searchMaxExtID: queue RLock fail wait",
	10027001: "SimpleQueue.searchMaxExtID: block RLock fail wait",

	10028000: "SimpleQueue.searchExtID: queue RLock fail wait",
	10028001: "SimpleQueue.searchExtID: block RLock fail wait",

	10029000: "SimpleQueue.AddUnique: externalID should be != 0",
	10029001: "SimpleQueue.AddUnique: queue Lock by source fail wait",

	10030000: "LoadSimpleQueue() (*SimpleQueue): unmarshal queue info error",
	10030001: "LoadSimpleQueue() (*SimpleQueue): unmarshal subscribers info error",

	10031000: "SimpleQueue.SaveSubscribers: queue subscribers Lock FileSave mutex fail wait",
	10031001: "SimpleQueue.SaveSubscribers: queue subscribers RLock fail wait",
	10031002: "SimpleQueue.SaveSubscribers: queue subscribers marshal fail",
	10031003: "SimpleQueue.SaveSubscribers: file %v save fail",
	10031004: "SimpleQueue.SaveSubscribers: queue Lock fail wait",

	10032000: "SimpleQueue.SubscriberSetLastRead: queue subscribers Lock fail wait",
	10032001: "SimpleQueue.SubscriberSetLastRead: chWait fail wait",
	10032002: "SimpleQueue.SubscriberSetLastRead: save mode %v is not allowed",

	10033000: "SimpleQueue.SubscriberGetLastRead: queue subscribers RLock fail wait",

	10033100: "SimpleQueue.SubscriberAddReplicaMember: queue subscribers Lock fail wait",
	10033101: "SimpleQueue.SubscriberAddReplicaMember: save fail",

	10033200: "SimpleQueue.SubscriberRemoveReplicaMember: queue subscribers Lock fail wait",
	10033201: "SimpleQueue.SubscriberRemoveReplicaMember: save fail",

	10033300: "SimpleQueue.SubscriberGetReplicaCount: queue subscribers RLock fail wait",
}

Errors codes and description

Functions

func GenerateError

func GenerateError(key int, a ...interface{}) *mft.Error

GenerateError -

func GenerateErrorE

func GenerateErrorE(key int, err error, a ...interface{}) *mft.Error

GenerateErrorE -

func SubscribeCopyUnique

func SubscribeCopyUnique(src Queue, dst Queue,
	userSrc cn.CapUser, userDst cn.CapUser,
	saveModeSrc cn.SaveMode, saveModeDst cn.SaveMode,
	subscriberName string, cntLimit int, doSaveDst bool,
	segments *segment.Segments,
) func(ctx context.Context) (isEmpty bool, err *mft.Error)

SubscribeCopyUnique subscribe on to queue and copy (addUniqueList) to destination

Types

type Message

type Message struct {
	ExternalID int64  `json:"eid,omitempty"`
	ExternalDt int64  `json:"s_dt,omitempty"`
	Message    []byte `json:"msg"`
	Source     string `json:"src,omitempty"`
	Segment    int64  `json:"sg,omitempty"`
}

Message one message

func MJBToMessageList

func MJBToMessageList(msgs []MessageJsonBody) (out []Message)

type MessageJsonBody

type MessageJsonBody struct {
	ID         int64           `json:"id"`
	Dt         time.Time       `json:"dt"`
	ExternalID int64           `json:"external_id,omitempty"`
	ExternalDt int64           `json:"message_ts,omitempty"`
	Message    json.RawMessage `json:"message"`
	Source     string          `json:"source,omitempty"`
	Segment    int64           `json:"segment,omitempty"`
}

MessageJsonBody with json body

func MWMToMessageJBList

func MWMToMessageJBList(msgs []*MessageWithMeta) (out []MessageJsonBody)

func (*MessageJsonBody) ToMessage

func (msg *MessageJsonBody) ToMessage() Message

type MessageOnlyMeta

type MessageOnlyMeta struct {
	ID int64 `json:"id"`
	// ExternalID - source id, when 0 then equal ID
	ExternalID int64     `json:"eid,omitempty"`
	ExternalDt int64     `json:"s_dt,omitempty"`
	Dt         time.Time `json:"dt"`
	Source     string    `json:"src,omitempty"`
	IsSaved    bool      `json:"is_saved"`
	Segment    int64     `json:"sg,omitempty"`
}

MessageOnlyMeta one message only meta

type MessageWithMeta

type MessageWithMeta struct {
	ID int64 `json:"id"`
	// ExternalID - source id, when 0 then equal ID
	ExternalID int64     `json:"eid,omitempty"`
	ExternalDt int64     `json:"s_dt,omitempty"`
	Dt         time.Time `json:"dt"`
	Message    []byte    `json:"msg"`
	Source     string    `json:"src,omitempty"`
	IsSaved    bool      `json:"is_saved"`
	Segment    int64     `json:"sg,omitempty"`
}

MessageWithMeta one message with meta

func (*MessageWithMeta) CopyOM

func (msg *MessageWithMeta) CopyOM() *MessageOnlyMeta

CopyOM copy message to QueueMessageOnlyMeta

func (*MessageWithMeta) ToMessage

func (msg *MessageWithMeta) ToMessage() Message

CopyOM copy message to QueueMessageOnlyMeta

func (*MessageWithMeta) ToMessageJB

func (msg *MessageWithMeta) ToMessageJB() MessageJsonBody

type Queue

type Queue interface {
	Add(ctx context.Context, user cn.CapUser, message []byte,
		externalID int64, externalDt int64, source string, segment int64,
		saveMode cn.SaveMode) (id int64, err *mft.Error)
	AddList(ctx context.Context, user cn.CapUser, messages []Message,
		saveMode cn.SaveMode) (ids []int64, err *mft.Error)

	// Get - gets messages from queue not more then cntLimit count and id more idStart
	// returns messages == nil when no elements
	Get(ctx context.Context, user cn.CapUser, idStart int64, cntLimit int) (messages []*MessageWithMeta, err *mft.Error)

	// GetSegment - gets messages from queue not more then cntLimit count and id more idStart
	// returns messages == nil when no elements
	// message should be in segment
	// lastId last readed message ID from queue
	GetSegment(ctx context.Context, user cn.CapUser, idStart int64, cntLimit int,
		segments *segment.Segments,
	) (messages []*MessageWithMeta, lastId int64, err *mft.Error)

	// SaveAll save all waiting for save block and metadata and else
	SaveAll(ctx context.Context, user cn.CapUser) (err *mft.Error)

	// AddUnique message to queue
	// externalDt is unix time
	// externalID is source id (should be != 0 !!!!)
	AddUnique(ctx context.Context, user cn.CapUser, message []byte,
		externalID int64, externalDt int64, source string, segment int64,
		saveMode cn.SaveMode) (id int64, err *mft.Error)
	AddUniqueList(ctx context.Context, user cn.CapUser, messages []Message,
		saveMode cn.SaveMode) (ids []int64, err *mft.Error)

	// SubscriberSetLastRead - set last read info
	// if id == 0 remove subscribe
	SubscriberSetLastRead(ctx context.Context, user cn.CapUser,
		subscriber string, id int64,
		saveMode cn.SaveMode) (err *mft.Error)

	// SubscriberGetLastRead - get last read info
	SubscriberGetLastRead(ctx context.Context, user cn.CapUser, subscriber string) (id int64, err *mft.Error)

	// SubscriberAddReplicaMember - add member to replica. Replica is group to control replication
	SubscriberAddReplicaMember(ctx context.Context, user cn.CapUser, subscriber string) (err *mft.Error)

	// SubscriberRemoveReplicaMember - remove member from replica. Replica is group to control replication
	SubscriberRemoveReplicaMember(ctx context.Context, user cn.CapUser, subscriber string) (err *mft.Error)

	// SubscriberGetReplicaCount - get how many members from replica get message. Replica is group to control replication
	SubscriberGetReplicaCount(ctx context.Context, user cn.CapUser, id int64) (cnt int, err *mft.Error)
}

Queue - queue of messages

type SimpleQueue

type SimpleQueue struct {
	Blocks []*SimpleQueueBlock `json:"blocks"`

	// MetaStorage - storage for metadata (block info)
	MetaStorage storage.Storage `json:"-"`

	// SubscriberStorage - storage for subscriber info
	SubscriberStorage storage.Storage `json:"-"`

	// MarkerBlockDataStorage - storage for block data with marker
	// case nil use MetaStorage
	MarkerBlockDataStorage map[string]storage.Storage `json:"-"`

	IDGenerator *mft.G `json:"-"`

	CntLimit  int           `json:"cnt_limit,omitempty"`
	TimeLimit time.Duration `json:"time_limit,omitempty"`
	LenLimit  int           `json:"len_limit,omitempty"`

	ChangesRv int64 `json:"-"`
	SaveRv    int64 `json:"-"`

	SaveWait []chan bool `json:"-"`

	SaveBlocks map[int64]*SimpleQueueBlock `json:"-"`

	Subscribers *SimpleQueueSubscribers `json:"-"`

	Source string `json:"-"`

	Segments *segment.Segments `json:"segments,omitempty"`

	DefaultSaveMode         cn.SaveMode `json:"default_save_mod,omitempty"`
	UseDefaultSaveModeForce bool        `json:"use_default_save_mod_force,omitempty"`
	// contains filtered or unexported fields
}

SimpleQueue - queue MVP

func CreateSimpleQueue

func CreateSimpleQueue(cntLimit int, timeLimit time.Duration, lenLimit int,
	metaStorage storage.Storage, subscriberStorage storage.Storage,
	markerBlockDataStorage map[string]storage.Storage, idGenerator *mft.G) *SimpleQueue

CreateSimpleQueue creates SimpleQueue MetaStorage - storage for metadata (block info) MarkerBlockDataStorage - storage for block data with marker case nil use MetaStorage

func LoadSimpleQueue

func LoadSimpleQueue(ctx context.Context, metaStorage storage.Storage, subscriberStorage storage.Storage,
	markerBlockDataStorage map[string]storage.Storage,
	idGenerator *mft.G) (q *SimpleQueue, err *mft.Error)

LoadSimpleQueue load queue from storage

func (*SimpleQueue) Add

func (q *SimpleQueue) Add(ctx context.Context, user cn.CapUser, message []byte,
	externalID int64, externalDt int64, source string, segment int64,
	saveMode cn.SaveMode) (id int64, err *mft.Error)

Add message to queue externalDt is unix time externalID is source id (should be != 0 if set 0 - not set)

func (*SimpleQueue) AddList

func (q *SimpleQueue) AddList(ctx context.Context, user cn.CapUser, messages []Message,
	saveMode cn.SaveMode) (ids []int64, err *mft.Error)

func (*SimpleQueue) AddUnique

func (q *SimpleQueue) AddUnique(ctx context.Context, user cn.CapUser, message []byte,
	externalID int64, externalDt int64, source string, segment int64,
	saveMode cn.SaveMode) (id int64, err *mft.Error)

AddUnique message to queue externalDt is unix time externalID is source id (should be != 0 !!!!)

func (*SimpleQueue) AddUniqueList

func (q *SimpleQueue) AddUniqueList(ctx context.Context, user cn.CapUser, messages []Message,
	saveMode cn.SaveMode) (ids []int64, err *mft.Error)

func (*SimpleQueue) DeleteBlocks

func (q *SimpleQueue) DeleteBlocks(ctx context.Context, user cn.CapUser, blocksCount int) (err *mft.Error)

DeleteBlocks dete blocks (NeedDelete true) blocksCount = 0 - unlimited

func (*SimpleQueue) Get

func (q *SimpleQueue) Get(ctx context.Context, user cn.CapUser, idStart int64, cntLimit int) (messages []*MessageWithMeta, err *mft.Error)

Get - gets messages from queue not more then cntLimit count and id more idStart returns messages == nil when no elements

func (*SimpleQueue) GetSegment

func (q *SimpleQueue) GetSegment(ctx context.Context, user cn.CapUser, idStart int64, cntLimit int,
	segments *segment.Segments,
) (messages []*MessageWithMeta, lastId int64, err *mft.Error)

GetSegment - gets messages from queue not more then cntLimit count and id more idStart returns messages == nil when no elements message should be in segment lastId last readed message ID from queue

func (*SimpleQueue) Mutex

func (q *SimpleQueue) Mutex() *mfs.PMutex

Mutex - returns Mutex

func (*SimpleQueue) MutexBlockSaveWait

func (q *SimpleQueue) MutexBlockSaveWait() *mfs.PMutex

MutexBlockSaveWait - returns Mutex for block save Wait

func (*SimpleQueue) MutexFileSave

func (q *SimpleQueue) MutexFileSave() *mfs.PMutex

MutexFileSave - returns Mutex for FileSave

func (*SimpleQueue) Save

func (q *SimpleQueue) Save(ctx context.Context, user cn.CapUser) (err *mft.Error)

Save save meta info of queue When MetaStorage == nil returns nil When SaveRv == ChangesRv do nothing and returns nil

func (*SimpleQueue) SaveAll

func (q *SimpleQueue) SaveAll(ctx context.Context, user cn.CapUser) (err *mft.Error)

SaveAll save all waiting for save block and metadata

func (*SimpleQueue) SaveSubscribers

func (q *SimpleQueue) SaveSubscribers(ctx context.Context, user cn.CapUser) (err *mft.Error)

SaveSubscribers save subscribers info of queue When MetaStorage == nil returns nil When SaveRv == ChangesRv do nothing and returns nil

func (*SimpleQueue) SetDelete

func (q *SimpleQueue) SetDelete(ctx context.Context, user cn.CapUser, setNeedDelete func(ctx context.Context, i int, len int, q *SimpleQueue, block *SimpleQueueBlock) (needDelete bool, err *mft.Error)) (err *mft.Error)

SetDelete - find and set blocks to delete it needs to save q (q.save(ctx)) after done find first false (dunc returns false) block and all balcks befor false are need to delete

func (*SimpleQueue) SetMarks

func (q *SimpleQueue) SetMarks(ctx context.Context, user cn.CapUser, setBlockMark func(ctx context.Context, i int, len int, q *SimpleQueue, block *SimpleQueueBlock) (needSetMark bool, nextMark string, err *mft.Error)) (err *mft.Error)

SetMarks - find and set blocks marks it needs to save q (q.save(ctx)) after done

func (*SimpleQueue) SetMaxExtID

func (q *SimpleQueue) SetMaxExtID(source string, extID int64)

SetMaxExtID - set max external id

func (*SimpleQueue) SetUnload

func (q *SimpleQueue) SetUnload(ctx context.Context, user cn.CapUser, setBlockUnload func(ctx context.Context, i int, len int, q *SimpleQueue, block *SimpleQueueBlock) (needUnload bool, err *mft.Error)) (err *mft.Error)

SetUnload - find and set blocks as unload

func (*SimpleQueue) SubscriberAddReplicaMember

func (q *SimpleQueue) SubscriberAddReplicaMember(ctx context.Context, user cn.CapUser, subscriber string) (err *mft.Error)

SubscriberAddReplicaMember - add replication subscriber member

func (*SimpleQueue) SubscriberGetLastRead

func (q *SimpleQueue) SubscriberGetLastRead(ctx context.Context, user cn.CapUser, subscriber string) (id int64, err *mft.Error)

SubscriberGetLastRead - get last read info

func (*SimpleQueue) SubscriberGetReplicaCount

func (q *SimpleQueue) SubscriberGetReplicaCount(ctx context.Context, user cn.CapUser, id int64) (cnt int, err *mft.Error)

SubscriberGetReplicaCount - get count of replication subscriber member

func (*SimpleQueue) SubscriberRemoveReplicaMember

func (q *SimpleQueue) SubscriberRemoveReplicaMember(ctx context.Context, user cn.CapUser, subscriber string) (err *mft.Error)

SubscriberRemoveReplicaMember - remove replication subscriber member

func (*SimpleQueue) SubscriberSetLastRead

func (q *SimpleQueue) SubscriberSetLastRead(ctx context.Context, user cn.CapUser,
	subscriber string, id int64,
	saveMode cn.SaveMode) (err *mft.Error)

SubscriberSetLastRead - set last read info if id == 0 remove subscribe

func (*SimpleQueue) UpdateMarks

func (q *SimpleQueue) UpdateMarks(ctx context.Context, user cn.CapUser, blocksCount int) (err *mft.Error)

UpdateMarks move from storage to storage and clear OldStorages it needs to save q (q.save(ctx)) after done blocksCount = 0 - unlimited

type SimpleQueueBlock

type SimpleQueueBlock struct {
	ID          int64     `json:"id"`
	Dt          time.Time `json:"dt"`
	Mark        string    `json:"mark"`
	RemoveMarks []string  `json:"rm_marks,omitempty"`
	NextMark    string    `json:"next_mark"`
	NeedDelete  bool      `json:"need_delete"`
	Len         int       `json:"len"`

	Data []*SimpleQueueMessage `json:"-"`

	ChangesRv int64 `json:"-"`
	SaveRv    int64 `json:"-"`

	IsUnload bool `json:"-"`

	SaveWait []chan bool `json:"-"`

	// May use without lock
	LastGet time.Time `json:"-"`
	// contains filtered or unexported fields
}

SimpleQueueBlock block with data

func (*SimpleQueueBlock) Mutex

func (block *SimpleQueueBlock) Mutex() *mfs.PMutex

Mutex - returns Mutex

func (*SimpleQueueBlock) MutexFileSave

func (block *SimpleQueueBlock) MutexFileSave() *mfs.PMutex

MutexFileSave - returns Mutex for FileSave

func (*SimpleQueueBlock) Save

func (block *SimpleQueueBlock) Save(ctx context.Context, q *SimpleQueue) (err *mft.Error)

Save save block of queue When q.MetaStorage == nil returns nil When block.SaveRv == block.ChangesRv do nothing and returns nil

func (*SimpleQueueBlock) Unload

func (block *SimpleQueueBlock) Unload(ctx context.Context, q *SimpleQueue) (isNotSave bool, err *mft.Error)

Unload - unload block from SimpleQueue

type SimpleQueueMessage

type SimpleQueueMessage struct {
	ID int64 `json:"id"`
	// ExternalID, source id, when 0 then equal ID
	ExternalID int64     `json:"eid,omitempty"`
	ExternalDt int64     `json:"s_dt,omitempty"`
	Dt         time.Time `json:"dt"`
	Message    []byte    `json:"msg,omitempty"`
	Source     string    `json:"src,omitempty"`
	Segment    int64     `json:"sg,omitempty"`
}

SimpleQueueMessage one message

func (*SimpleQueueMessage) CopyWM

func (msg *SimpleQueueMessage) CopyWM() *MessageWithMeta

CopyWM copy message to QueueMessageWithMeta

type SimpleQueueSubscriberInfo

type SimpleQueueSubscriberInfo struct {
	LastID  int64     `json:"last_id"`
	StartDt time.Time `json:"start_dt"`
	LastDt  time.Time `json:"last_dt"`
}

SimpleQueueSubscriberInfo info aboun one subscriber

type SimpleQueueSubscribers

type SimpleQueueSubscribers struct {
	ChangesRv int64 `json:"-"`
	SaveRv    int64 `json:"-"`

	SaveWait []chan bool `json:"-"`

	SubscribersInfo    map[string]*SimpleQueueSubscriberInfo `json:"si,omitempty"`
	ReplicaSubscribers map[string]struct{}                   `json:"rs,omitempty"`
	// contains filtered or unexported fields
}

SimpleQueueSubscribers line subscribers info

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL