msg_tracker

package
v0.0.2 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Nov 7, 2025 License: Apache-2.0 Imports: 5 Imported by: 0

Documentation

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

This section is empty.

Types

type AckedResult

type AckedResult struct {
	// Maybe should be an enum?
	Dup   bool
	Acked bool
}

type GeneratorReport

type GeneratorReport struct {
	Unacked          uint
	TotalAcked       uint
	TotalDuped       uint
	OldestUnackedAge time.Time
}

GeneratorReport contains statistics for a single generator

type MessageRange

type MessageRange struct {
	sync.RWMutex
	StartID        uint64
	RangeLen       uint
	Timestamp      time.Time
	AckedCount     uint // Number of unique messages acked
	DuplicateCount uint // Number of duplicate acks received
	// contains filtered or unexported fields
}

MessageRange represents a range of message IDs with a bitmask for tracking acknowledgments

func NewMessageRange

func NewMessageRange(startID uint64, rangeLen uint) *MessageRange

NewMessageRange creates a new message range

func (*MessageRange) Ack

func (mr *MessageRange) Ack(msgID uint64) (AckedResult, bool)

Ack marks a message ID as acknowledged Returns (AckedResult, success) where success indicates if the message was in range

func (*MessageRange) GetTimestamp

func (mr *MessageRange) GetTimestamp() time.Time

func (*MessageRange) IsAcked

func (mr *MessageRange) IsAcked(msgID uint64) bool

IsAcked checks if a message ID has been acknowledged

func (*MessageRange) OlderThan

func (mr *MessageRange) OlderThan(timestamp time.Time) bool

func (*MessageRange) TotalAckedCount

func (mr *MessageRange) TotalAckedCount() uint

func (*MessageRange) TotalMessages

func (mr *MessageRange) TotalMessages() uint

TotalMessages returns the total number of messages in the range

func (*MessageRange) UnackedCount

func (mr *MessageRange) UnackedCount() uint

UnackedCount returns the number of messages that have not been acknowledged

func (*MessageRange) UpdateRangeLen

func (mr *MessageRange) UpdateRangeLen(rangeLen uint)

Update the length of the message range, this won't resize the bitmap however

func (*MessageRange) UpdateTimestamp

func (mr *MessageRange) UpdateTimestamp(timestamp time.Time)

type Tracker

type Tracker struct {
	// contains filtered or unexported fields
}

Tracker is the main message tracking service

func NewTracker

func NewTracker(log *zap.Logger) *Tracker

NewTracker creates a new message tracker

func (*Tracker) Ack

func (t *Tracker) Ack(generatorID string, startRangeID uint64, rangeLen uint, msgID uint64) bool

Ack acknowledges a message ID within a specific range for a generator

func (*Tracker) AddRange

func (t *Tracker) AddRange(generatorID string, startRangeID uint64, rangeLen uint, timestamp time.Time)

AddRange adds a message range for a generator without acking any messages The timestamp is recorded for the range. If the range already exists, the timestamp is updated.

func (*Tracker) GeneratorReport

func (t *Tracker) GeneratorReport(timestamp time.Time) map[string]GeneratorReport

GeneratorReport returns a report for each generator containing statistics about unacked messages, total acked messages, duplicates, and the oldest unacked timestamp. The unacked count only includes ranges with timestamps before the given timestamp.

func (*Tracker) UpdateRange

func (t *Tracker) UpdateRange(generatorID string, startRangeID uint64, rangeLen uint)

UpdateRange updates the length of an existing message range. This is typically called when a load generator exits before sending the entire range of messages. By updating the range length, we ensure that unsent messages are not counted as unacked. Note: This only updates RangeLen; the bitmap is not resized.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL