batch

package
v0.0.0-...-7c007b9 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Apr 3, 2026 License: Apache-2.0 Imports: 11 Imported by: 0

Documentation

Overview

Package batch will group points in batch to send multiple consecutive points of the same metrics to TSDB.

It rely on a temporary store that has fast random-access, either an in-memory store or Redis.

In the temporary store, all new points will be appened on a per-metric key. For each metric, one SquirrelDB instance will be the owner and will flush points once a deadline is reached.

The SquirrelDB instance that is owner, is the instance that appened point on an empty metric key.

The ownership could be: * dropped by current owner, when the metric key become empty * transferred when the current owner is shutting down * taken-over by any SquirrelDB if metric flush deadline is overdue (which means that current owner is dead)

A SquirrelDB may also flush a metric if the number of points exceed a threshold (which could happen during backlog processing). This could be done by a non-owner, since we can't guaranteed that the owner will receive points for that metric.

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

This section is empty.

Types

type Batch

type Batch struct {
	// contains filtered or unexported fields
}

Batch receive a stream of points and send batch for points to the writer. It use a memory store to keep points before flushing them to the writer. It also allow to read points merging value from the persistent store (reader) and the memory store.

func New

func New(
	reg prometheus.Registerer,
	batchSize time.Duration,
	memoryStore TemporaryStore,
	reader types.MetricReader,
	writer types.MetricWriter,
	logger zerolog.Logger,
) *Batch

New creates a new Batch object.

func (*Batch) Flush

func (b *Batch) Flush() error

Flush force writing all in-memory (or in Redis) metrics from this SquirrelDB instance to TSDB.

func (*Batch) ReadIter

func (b *Batch) ReadIter(ctx context.Context, request types.MetricRequest) (types.MetricDataSet, error)

ReadIter returns the deduplicated and sorted points read from the temporary and persistent storage according to the request.

func (*Batch) Run

func (b *Batch) Run(ctx context.Context)

Run starts Batch service (e.g. flushing points after a deadline).

func (*Batch) Write

func (b *Batch) Write(ctx context.Context, metrics []types.MetricData) error

Write implements MetricWriter.

type TemporaryStore

type TemporaryStore interface {
	// Append add points to the in-memory store and return the number of points in the store (after append)
	// for each metrics.
	Append(ctx context.Context, points []types.MetricData) ([]int, error)

	// GetSetPointsAndOffset do an atomic read-and-set on the metric points (atomic per-metric).
	// It also set the offset for each metrics and add metric ids to known metric.
	GetSetPointsAndOffset(ctx context.Context, points []types.MetricData, offsets []int) ([]types.MetricData, error)

	// ReadPointsAndOffset simply return points of write offset for metrics
	ReadPointsAndOffset(ctx context.Context, ids []types.MetricID) ([]types.MetricData, []int, error)

	// MarkToExpire will mark points, write offset and flushdeadline to expire
	// This is used to delete those entry. Since it should only be deleted if empty but
	// redis doesn't support this, it use an expiration longer than the flush deadline to ensure
	// only empty & no longer user metrics are deleted
	// It also forget the metric from known metrics.
	// Expiration is removed by GetSetPointsAndOffset and GetSetFlushDeadline
	MarkToExpire(ctx context.Context, ids []types.MetricID, ttl time.Duration) error

	// GetSetFlushDeadline do an atomic read-and-set on the metric flush deadline (atomic per-metric).
	GetSetFlushDeadline(ctx context.Context, deadlines map[types.MetricID]time.Time) (map[types.MetricID]time.Time, error)

	// AddToTransfert add the metrics to a list of metrics to transfert ownership from one SquirrelDB to another
	AddToTransfert(ctx context.Context, ids []types.MetricID) error

	// GetTransfert return & remove count metric from the list of metrics to transfert
	GetTransfert(ctx context.Context, count int) (map[types.MetricID]time.Time, error)

	// GetAllKnownMetrics return all known metrics with their deadline
	GetAllKnownMetrics(ctx context.Context) (map[types.MetricID]time.Time, error)
}

TemporaryStore is an interface to the temporary points associated to metrics.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL