compactor

package
v1.4.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Feb 6, 2024 License: AGPL-3.0 Imports: 49 Imported by: 0

Documentation

Overview

SPDX-License-Identifier: AGPL-3.0-only Provenance-includes-location: https://github.com/grafana/mimir/blob/main/pkg/compactor/compactor.go Provenance-includes-license: Apache-2.0 Provenance-includes-copyright: The Cortex Authors.

SPDX-License-Identifier: AGPL-3.0-only Provenance-includes-location: https://github.com/grafana/mimir/blob/main/pkg/compactor/job.go Provenance-includes-license: Apache-2.0 Provenance-includes-copyright: The Cortex Authors.

SPDX-License-Identifier: AGPL-3.0-only Provenance-includes-location: https://github.com/grafana/mimir/blob/main/pkg/compactor/split_merge_compactor.go Provenance-includes-license: Apache-2.0 Provenance-includes-copyright: The Cortex Authors.

Index

Constants

View Source
const (
	CompactionSplitByFingerprint         = "fingerprint"
	CompactionSplitByStacktracePartition = "stacktracePartition"
)
View Source
const (
	CompactionOrderOldestFirst = "smallest-range-oldest-blocks-first"
	CompactionOrderNewestFirst = "newest-blocks-first"
)

Variables

Functions

func DefaultGroupKey

func DefaultGroupKey(meta block.Meta) string

DefaultGroupKey returns a unique identifier for the group the block belongs to, based on the DefaultGrouper logic. It considers the downsampling resolution and the block's labels.

Types

type BlockCompactor

type BlockCompactor struct {
	// contains filtered or unexported fields
}

func (*BlockCompactor) CompactWithSplitting

func (c *BlockCompactor) CompactWithSplitting(ctx context.Context, dest string, dirs []string, shardCount, stageSize uint64) ([]ulid.ULID, error)

type BlocksCleaner

type BlocksCleaner struct {
	services.Service
	// contains filtered or unexported fields
}

func NewBlocksCleaner

func NewBlocksCleaner(cfg BlocksCleanerConfig, bucketClient objstore.Bucket, ownUser func(userID string) (bool, error), cfgProvider ConfigProvider, logger log.Logger, reg prometheus.Registerer) *BlocksCleaner

type BlocksCleanerConfig

type BlocksCleanerConfig struct {
	DeletionDelay              time.Duration
	CleanupInterval            time.Duration
	CleanupConcurrency         int
	TenantCleanupDelay         time.Duration // Delay before removing tenant deletion mark and "debug".
	DeleteBlocksConcurrency    int
	NoBlocksFileCleanupEnabled bool
}

type BlocksCompactorFactory

type BlocksCompactorFactory func(
	ctx context.Context,
	cfg Config,
	cfgProvider ConfigProvider,
	userID string,
	logger log.Logger,
	metrics *CompactorMetrics,
) (Compactor, error)

BlocksCompactorFactory builds and returns the compactor and planner to use to compact a tenant's blocks.

type BlocksGrouperFactory

type BlocksGrouperFactory func(
	ctx context.Context,
	cfg Config,
	cfgProvider ConfigProvider,
	userID string,
	logger log.Logger,
	reg prometheus.Registerer,
) Grouper

BlocksGrouperFactory builds and returns the grouper to use to compact a tenant's blocks.

type BlocksPlannerFactory added in v1.3.0

type BlocksPlannerFactory func(
	cfg Config,
) Planner

BlocksPlannerFactory builds and returns the compactor and planner to use to compact a tenant's blocks.

type BucketCompactor

type BucketCompactor struct {
	// contains filtered or unexported fields
}

BucketCompactor compacts blocks in a bucket.

func NewBucketCompactor

func NewBucketCompactor(
	logger log.Logger,
	sy *Syncer,
	grouper Grouper,
	planner Planner,
	comp Compactor,
	compactDir string,
	bkt objstore.Bucket,
	concurrency int,
	ownJob ownCompactionJobFunc,
	sortJobs JobsOrderFunc,
	waitPeriod time.Duration,
	blockSyncConcurrency int,
	metrics *BucketCompactorMetrics,
) (*BucketCompactor, error)

NewBucketCompactor creates a new bucket compactor.

func (*BucketCompactor) Compact

func (c *BucketCompactor) Compact(ctx context.Context, maxCompactionTime time.Duration) (rerr error)

Compact runs compaction over bucket. If maxCompactionTime is positive then after this time no more new compactions are started.

type BucketCompactorMetrics

type BucketCompactorMetrics struct {
	// contains filtered or unexported fields
}

BucketCompactorMetrics holds the metrics tracked by BucketCompactor.

func NewBucketCompactorMetrics

func NewBucketCompactorMetrics(blocksMarkedForDeletion prometheus.Counter, reg prometheus.Registerer) *BucketCompactorMetrics

NewBucketCompactorMetrics makes a new BucketCompactorMetrics.

type Compactor

type Compactor interface {
	// CompactWithSplitting merges and splits the source blocks into shardCount number of compacted blocks,
	// and returns slice of block IDs.
	// If given compacted block has no series, corresponding block ID will not be returned.
	CompactWithSplitting(ctx context.Context, dst string, dirs []string, shardCount, stageSize uint64) (result []ulid.ULID, _ error)
}

Compactor provides compaction against an underlying storage of profiling data.

type CompactorMetrics

type CompactorMetrics struct {
	Ran               *prometheus.CounterVec
	InProgress        *prometheus.GaugeVec
	OverlappingBlocks prometheus.Counter
	Duration          *prometheus.HistogramVec
	Size              *prometheus.HistogramVec
	Samples           *prometheus.HistogramVec
	Range             *prometheus.HistogramVec
	Split             *prometheus.HistogramVec
}

type Config

type Config struct {
	BlockRanges                DurationList  `yaml:"block_ranges" category:"advanced"`
	BlockSyncConcurrency       int           `yaml:"block_sync_concurrency" category:"advanced"`
	MetaSyncConcurrency        int           `yaml:"meta_sync_concurrency" category:"advanced"`
	DataDir                    string        `yaml:"data_dir"`
	CompactionInterval         time.Duration `yaml:"compaction_interval" category:"advanced"`
	CompactionRetries          int           `yaml:"compaction_retries" category:"advanced"`
	CompactionConcurrency      int           `yaml:"compaction_concurrency" category:"advanced"`
	CompactionWaitPeriod       time.Duration `yaml:"first_level_compaction_wait_period"`
	CleanupInterval            time.Duration `yaml:"cleanup_interval" category:"advanced"`
	CleanupConcurrency         int           `yaml:"cleanup_concurrency" category:"advanced"`
	DeletionDelay              time.Duration `yaml:"deletion_delay" category:"advanced"`
	TenantCleanupDelay         time.Duration `yaml:"tenant_cleanup_delay" category:"advanced"`
	MaxCompactionTime          time.Duration `yaml:"max_compaction_time" category:"advanced"`
	NoBlocksFileCleanupEnabled bool          `yaml:"no_blocks_file_cleanup_enabled" category:"experimental"`
	DownsamplerEnabled         bool          `yaml:"downsampler_enabled" category:"advanced"`

	// Compactor concurrency options
	MaxOpeningBlocksConcurrency int `yaml:"max_opening_blocks_concurrency" category:"advanced"` // Number of goroutines opening blocks before compaction.

	EnabledTenants  flagext.StringSliceCSV `yaml:"enabled_tenants" category:"advanced"`
	DisabledTenants flagext.StringSliceCSV `yaml:"disabled_tenants" category:"advanced"`

	// Compactors sharding.
	ShardingRing RingConfig `yaml:"sharding_ring"`

	CompactionJobsOrder string `yaml:"compaction_jobs_order" category:"advanced"`
	CompactionSplitBy   string `yaml:"compaction_split_by" category:"advanced"`

	// Allow downstream projects to customise the blocks compactor.
	BlocksGrouperFactory   BlocksGrouperFactory   `yaml:"-"`
	BlocksCompactorFactory BlocksCompactorFactory `yaml:"-"`
	BlocksPlannerFactory   BlocksPlannerFactory   `yaml:"-"`
	// contains filtered or unexported fields
}

Config holds the MultitenantCompactor config.

func (*Config) RegisterFlags

func (cfg *Config) RegisterFlags(f *flag.FlagSet, logger log.Logger)

RegisterFlags registers the MultitenantCompactor flags.

func (*Config) Validate

func (cfg *Config) Validate(maxBlockDuration time.Duration) error

type ConfigProvider

type ConfigProvider interface {
	objstore.TenantConfigProvider

	// CompactorBlocksRetentionPeriod returns the retention period for a given user.
	CompactorBlocksRetentionPeriod(user string) time.Duration

	// CompactorSplitAndMergeShards returns the number of shards to use when splitting blocks.
	CompactorSplitAndMergeShards(userID string) int

	// CompactorSplitAndMergeStageSize returns the number of stages split shards will be written to.
	CompactorSplitAndMergeStageSize(userID string) int

	// CompactorSplitGroups returns the number of groups that blocks used for splitting should
	// be grouped into. Different groups are then split by different jobs.
	CompactorSplitGroups(userID string) int

	// CompactorTenantShardSize returns number of compactors that this user can use. 0 = all compactors.
	CompactorTenantShardSize(userID string) int

	// CompactorPartialBlockDeletionDelay returns the partial block delay time period for a given user,
	// and whether the configured value was valid. If the value wasn't valid, the returned delay is the default one
	// and the caller is responsible to warn the Mimir operator about it.
	CompactorPartialBlockDeletionDelay(userID string) (delay time.Duration, valid bool)

	// CompactorDownsamplerEnabled returns true if the downsampler is enabled for a given user.
	CompactorDownsamplerEnabled(userId string) bool
}

ConfigProvider defines the per-tenant config provider for the MultitenantCompactor.

type DeduplicateFilter

type DeduplicateFilter interface {
	block.MetadataFilter

	// DuplicateIDs returns IDs of duplicate blocks generated by last call to Filter method.
	DuplicateIDs() []ulid.ULID
}

type DurationList

type DurationList []time.Duration

DurationList is the block ranges for a tsdb

func (*DurationList) Set

func (d *DurationList) Set(s string) error

Set implements the flag.Value interface

func (*DurationList) String

func (d *DurationList) String() string

String implements the flag.Value interface

func (*DurationList) ToMilliseconds

func (d *DurationList) ToMilliseconds() []int64

ToMilliseconds returns the duration list in milliseconds

type Grouper

type Grouper interface {
	// Groups returns the compaction jobs for all blocks currently known to the syncer.
	// It creates all jobs from the scratch on every call.
	Groups(blocks map[ulid.ULID]*block.Meta) (res []*Job, err error)
}

Grouper is responsible to group all known blocks into compaction Job which are safe to be compacted concurrently.

type Job

type Job struct {
	// contains filtered or unexported fields
}

Job holds a compaction job, which consists of a group of blocks that should be compacted together. Not goroutine safe.

func NewJob

func NewJob(userID string, key string, lset labels.Labels, resolution int64, useSplitting bool, splitNumShards, splitStageSize uint32, shardingKey string) *Job

NewJob returns a new compaction Job.

func (*Job) AppendMeta

func (job *Job) AppendMeta(meta *block.Meta) error

AppendMeta the block with the given meta to the job.

func (*Job) IDs

func (job *Job) IDs() (ids []ulid.ULID)

IDs returns all sorted IDs of blocks in the job.

func (*Job) Key

func (job *Job) Key() string

Key returns an identifier for the job.

func (*Job) Labels

func (job *Job) Labels() labels.Labels

Labels returns the external labels for the output block(s) of this job.

func (*Job) MaxTime

func (job *Job) MaxTime() int64

MaxTime returns the max time across all job's blocks.

func (*Job) Metas

func (job *Job) Metas() []*block.Meta

Metas returns the metadata for each block that is part of this job, ordered by the block's MinTime

func (*Job) MinCompactionLevel

func (job *Job) MinCompactionLevel() int

MinCompactionLevel returns the minimum compaction level across all source blocks in this job.

func (*Job) MinTime

func (job *Job) MinTime() int64

MinTime returns the min time across all job's blocks.

func (*Job) Resolution

func (job *Job) Resolution() int64

Resolution returns the common downsampling resolution of blocks in the job.

func (*Job) ShardingKey

func (job *Job) ShardingKey() string

ShardingKey returns the key used to shard this job across multiple instances.

func (*Job) SplitStageSize

func (job *Job) SplitStageSize() uint32

SplitStageSize returns the number of stages split shards will be written to.

func (*Job) SplittingShards

func (job *Job) SplittingShards() uint32

SplittingShards returns the number of output shards to build if splitting is enabled.

func (*Job) String

func (job *Job) String() string

func (*Job) UseSplitting

func (job *Job) UseSplitting() bool

UseSplitting returns whether blocks should be split into multiple shards when compacted.

func (*Job) UserID

func (job *Job) UserID() string

UserID returns the user/tenant to which this job belongs to.

type JobsOrderFunc

type JobsOrderFunc func(jobs []*Job) []*Job

func GetJobsOrderFunction

func GetJobsOrderFunction(name string) JobsOrderFunc

GetJobsOrderFunction returns jobs ordering function, or nil, if name doesn't refer to any function.

type LabelRemoverFilter

type LabelRemoverFilter struct {
	// contains filtered or unexported fields
}

func NewLabelRemoverFilter

func NewLabelRemoverFilter(labels []string) *LabelRemoverFilter

NewLabelRemoverFilter creates a LabelRemoverFilter.

func (*LabelRemoverFilter) Filter

func (f *LabelRemoverFilter) Filter(_ context.Context, metas map[ulid.ULID]*block.Meta, _ block.GaugeVec) error

Filter modifies external labels of existing blocks, removing given labels from the metadata of blocks that have it.

type MultitenantCompactor

type MultitenantCompactor struct {
	services.Service
	// contains filtered or unexported fields
}

MultitenantCompactor is a multi-tenant TSDB blocks compactor based on Thanos.

func NewMultitenantCompactor

func NewMultitenantCompactor(compactorCfg Config, bucketClient objstore.Bucket, cfgProvider ConfigProvider, logger log.Logger, registerer prometheus.Registerer) (*MultitenantCompactor, error)

NewMultitenantCompactor makes a new MultitenantCompactor.

func (*MultitenantCompactor) RingHandler

func (c *MultitenantCompactor) RingHandler(w http.ResponseWriter, req *http.Request)

type NoCompactionMarkFilter

type NoCompactionMarkFilter struct {
	// contains filtered or unexported fields
}

NoCompactionMarkFilter is a block.Fetcher filter that finds all blocks with no-compact marker files, and optionally removes them from synced metas.

func NewNoCompactionMarkFilter

func NewNoCompactionMarkFilter(bkt objstore.BucketReader, removeNoCompactBlocks bool) *NoCompactionMarkFilter

NewNoCompactionMarkFilter creates NoCompactionMarkFilter.

func (*NoCompactionMarkFilter) Filter

func (f *NoCompactionMarkFilter) Filter(ctx context.Context, metas map[ulid.ULID]*block.Meta, synced block.GaugeVec) error

Filter finds blocks that should not be compacted, and fills f.noCompactMarkedMap. If f.removeNoCompactBlocks is true, blocks are also removed from metas. (Thanos version of the filter doesn't do removal).

func (*NoCompactionMarkFilter) NoCompactMarkedBlocks

func (f *NoCompactionMarkFilter) NoCompactMarkedBlocks() map[ulid.ULID]struct{}

NoCompactMarkedBlocks returns block ids that were marked for no compaction. It is safe to call this method only after Filter has finished, and it is also safe to manipulate the map between calls to Filter.

type Planner

type Planner interface {
	// Plan returns a list of blocks that should be compacted into single one.
	// The blocks can be overlapping. The provided metadata has to be ordered by minTime.
	Plan(ctx context.Context, metasByMinTime []*block.Meta) ([]*block.Meta, error)
}

Planner returns blocks to compact.

type RingConfig

type RingConfig struct {
	Common util.CommonRingConfig `yaml:",inline"`

	// Wait ring stability.
	WaitStabilityMinDuration time.Duration `yaml:"wait_stability_min_duration" category:"advanced"`
	WaitStabilityMaxDuration time.Duration `yaml:"wait_stability_max_duration" category:"advanced"`

	WaitActiveInstanceTimeout time.Duration `yaml:"wait_active_instance_timeout" category:"advanced"`

	ObservePeriod time.Duration `yaml:"-"`
}

RingConfig masks the ring lifecycler config which contains many options not really required by the compactors ring. This config is used to strip down the config to the minimum, and avoid confusion to the user.

func (*RingConfig) RegisterFlags

func (cfg *RingConfig) RegisterFlags(f *flag.FlagSet, logger log.Logger)

RegisterFlags adds the flags required to config this to the given FlagSet

func (*RingConfig) ToBasicLifecyclerConfig

func (cfg *RingConfig) ToBasicLifecyclerConfig(logger log.Logger) (ring.BasicLifecyclerConfig, error)

type ShardAwareDeduplicateFilter

type ShardAwareDeduplicateFilter struct {
	// contains filtered or unexported fields
}

ShardAwareDeduplicateFilter is a MetaFetcher filter that filters out older blocks that have exactly the same data. Not go-routine safe.

func NewShardAwareDeduplicateFilter

func NewShardAwareDeduplicateFilter() *ShardAwareDeduplicateFilter

NewShardAwareDeduplicateFilter creates ShardAwareDeduplicateFilter.

func (*ShardAwareDeduplicateFilter) DuplicateIDs

func (f *ShardAwareDeduplicateFilter) DuplicateIDs() []ulid.ULID

DuplicateIDs returns slice of block ids that are filtered out by ShardAwareDeduplicateFilter.

func (*ShardAwareDeduplicateFilter) Filter

func (f *ShardAwareDeduplicateFilter) Filter(ctx context.Context, metas map[ulid.ULID]*block.Meta, synced block.GaugeVec) error

Filter filters out from metas, the initial map of blocks, all the blocks that are contained in other, compacted, blocks. The removed blocks are source blocks of the blocks that remain in metas after the filtering is executed.

type SplitAndMergeGrouper

type SplitAndMergeGrouper struct {
	// contains filtered or unexported fields
}

func NewSplitAndMergeGrouper

func NewSplitAndMergeGrouper(
	userID string,
	ranges []int64,
	shardCount uint32,
	splitStageSize uint32,
	splitGroupsCount uint32,
	logger log.Logger,
) *SplitAndMergeGrouper

NewSplitAndMergeGrouper makes a new SplitAndMergeGrouper. The provided ranges must be sorted. If shardCount is 0, the splitting stage is disabled.

func (*SplitAndMergeGrouper) Groups

func (g *SplitAndMergeGrouper) Groups(blocks map[ulid.ULID]*block.Meta) (res []*Job, err error)

type SplitAndMergePlanner

type SplitAndMergePlanner struct {
	// contains filtered or unexported fields
}

func NewSplitAndMergePlanner

func NewSplitAndMergePlanner(ranges []int64) *SplitAndMergePlanner

func (*SplitAndMergePlanner) Plan

func (c *SplitAndMergePlanner) Plan(_ context.Context, metasByMinTime []*block.Meta) ([]*block.Meta, error)

Plan implements compact.Planner.

type Syncer

type Syncer struct {
	// contains filtered or unexported fields
}

Syncer synchronizes block metas from a bucket into a local directory. It sorts them into compaction groups based on equal label sets.

func NewMetaSyncer

func NewMetaSyncer(logger log.Logger, reg prometheus.Registerer, bkt objstore.Bucket, fetcher *block.MetaFetcher, deduplicateBlocksFilter DeduplicateFilter, blocksMarkedForDeletion prometheus.Counter) (*Syncer, error)

NewMetaSyncer returns a new Syncer for the given Bucket and directory. Blocks must be at least as old as the sync delay for being considered.

func (*Syncer) GarbageCollect

func (s *Syncer) GarbageCollect(ctx context.Context) error

GarbageCollect marks blocks for deletion from bucket if their data is available as part of a block with a higher compaction level. Call to SyncMetas function is required to populate duplicateIDs in duplicateBlocksFilter.

func (*Syncer) Metas

func (s *Syncer) Metas() map[ulid.ULID]*block.Meta

Metas returns loaded metadata blocks since last sync.

func (*Syncer) SyncMetas

func (s *Syncer) SyncMetas(ctx context.Context) error

SyncMetas synchronizes local state of block metas with what we have in the bucket.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL