messages

package
v0.0.0-...-e78f20e Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jul 14, 2023 License: MIT Imports: 17 Imported by: 0

Documentation

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

func FormatBoundTotals

func FormatBoundTotals(boundsSlice []Bounds) string

FormatBoundTotals makes a pretty table from the totals across all partition results from a GetAllPartitionBounds call.

func FormatBounds

func FormatBounds(boundsSlice []Bounds) string

FormatBounds makes a pretty table from the results of a GetAllPartitionBounds call.

func FormatTailStats

func FormatTailStats(stats TailStats, filtered bool) string

FormatTailStats generates a pretty table from a TailStats instance.

Types

type Bounds

type Bounds struct {
	Partition   int
	FirstTime   time.Time
	FirstOffset int64
	LastTime    time.Time
	LastOffset  int64
}

Bounds represents the start and end "bounds" of the messages in a partition.

func GetAllPartitionBounds

func GetAllPartitionBounds(
	ctx context.Context,
	connector *admin.Connector,
	topic string,
	baseOffsets map[int]int64,
) ([]Bounds, error)

GetAllPartitionBounds gets the bounds for all partitions in the argument topic. The start of each bound is based on the value in the baseOffsets map or, if this is nil, the starting offset in each topic partition.

func GetPartitionBounds

func GetPartitionBounds(
	ctx context.Context,
	connector *admin.Connector,
	topic string,
	partition int,
	minOffset int64,
) (Bounds, error)

GetPartitionBounds gets the bounds for a single partition in the argument topic. It does this by dialing the leader of the partition and then reading the first and last messages. If the provided minOffset is greater than the first offset, this is used instead of the actual first offset.

type TailMessage

type TailMessage struct {
	Message   kafka.Message
	Partition int
	Err       error
}

TailMessage represents a single message retrieved from a kafka reader.

type TailPartitionStats

type TailPartitionStats struct {
	TotalErrors               int
	TotalMessages             int
	TotalMessageBytes         int64
	TotalMessagesFiltered     int
	TotalMessageBytesFiltered int64
	FirstOffset               int64
	FirstTime                 time.Time
	LastOffset                int64
	LastTime                  time.Time
}

TailPartitionStats stores stats on the fetches from a single topic partition.

type TailStats

type TailStats struct {
	PartitionStats map[int]*TailPartitionStats
}

TailStats stores stats on all partitions that are tailed.

type TopicTailer

type TopicTailer struct {
	Connector *admin.Connector
	// contains filtered or unexported fields
}

TopicTailer fetches a stream of messages from a topic.

func NewTopicTailer

func NewTopicTailer(
	Connector *admin.Connector,
	topic string,
	partitions []int,
	offset int64,
	minBytes int,
	maxBytes int,
) *TopicTailer

NewTopicTailer returns a new TopicTailer instance.

func (*TopicTailer) GetMessages

func (t *TopicTailer) GetMessages(
	ctx context.Context,
	messagesChan chan TailMessage,
)

GetMessages gets a stream of messages from the tailer. These are passed back through the argument channel.

func (*TopicTailer) LogMessages

func (t *TopicTailer) LogMessages(
	ctx context.Context,
	maxMessages int,
	filterRegexp string,
	raw bool,
	headers bool,
) (TailStats, error)

LogMessages logs out the message stream from the tailer. It returns stats from the tail run that can be displayed by the caller after the context is cancelled or maxMessages messages have been tailed.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL