distributor

package
v1.16.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Nov 19, 2023 License: Apache-2.0 Imports: 45 Imported by: 22

Documentation

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

func NewPool added in v1.1.0

func NewPool(cfg PoolConfig, ring ring.ReadRing, factory ring_client.PoolFactory, logger log.Logger) *ring_client.Pool

Types

type Config

type Config struct {
	PoolConfig PoolConfig `yaml:"pool"`

	HATrackerConfig HATrackerConfig `yaml:"ha_tracker"`

	MaxRecvMsgSize  int           `yaml:"max_recv_msg_size"`
	RemoteTimeout   time.Duration `yaml:"remote_timeout"`
	ExtraQueryDelay time.Duration `yaml:"extra_queue_delay"`

	ShardingStrategy         string `yaml:"sharding_strategy"`
	ShardByAllLabels         bool   `yaml:"shard_by_all_labels"`
	ExtendWrites             bool   `yaml:"extend_writes"`
	SignWriteRequestsEnabled bool   `yaml:"sign_write_requests"`

	// Distributors ring
	DistributorRing RingConfig `yaml:"ring"`

	// for testing and for extending the ingester by adding calls to the client
	IngesterClientFactory ring_client.PoolFactory `yaml:"-"`

	// when true the distributor does not validate the label name, Cortex doesn't directly use
	// this (and should never use it) but this feature is used by other projects built on top of it
	SkipLabelNameValidation bool `yaml:"-"`

	// This config is dynamically injected because defined in the querier config.
	ShuffleShardingLookbackPeriod time.Duration `yaml:"-"`

	// Limits for distributor
	InstanceLimits InstanceLimits `yaml:"instance_limits"`
}

Config contains the configuration required to create a Distributor

func (*Config) RegisterFlags

func (cfg *Config) RegisterFlags(f *flag.FlagSet)

RegisterFlags adds the flags required to config this to the given FlagSet

func (*Config) Validate added in v0.4.0

func (cfg *Config) Validate(limits validation.Limits) error

Validate config and returns error on failure

type Distributor

type Distributor struct {
	services.Service

	// For handling HA replicas.
	HATracker *ha.HATracker
	// contains filtered or unexported fields
}

Distributor is a storage.SampleAppender and a client.Querier which forwards appends and queries to individual ingesters.

func New

func New(cfg Config, clientConfig ingester_client.Config, limits *validation.Overrides, ingestersRing ring.ReadRing, canJoinDistributorsRing bool, reg prometheus.Registerer, log log.Logger) (*Distributor, error)

New constructs a new Distributor

func (*Distributor) AllUserStats

func (d *Distributor) AllUserStats(ctx context.Context) ([]UserIDStats, error)

AllUserStats returns statistics about all users. Note it does not divide by the ReplicationFactor like UserStats()

func (*Distributor) AllUserStatsHandler

func (d *Distributor) AllUserStatsHandler(w http.ResponseWriter, r *http.Request)

AllUserStatsHandler shows stats for all users.

func (*Distributor) ForReplicationSet added in v1.5.0

func (d *Distributor) ForReplicationSet(ctx context.Context, replicationSet ring.ReplicationSet, f func(context.Context, ingester_client.IngesterClient) (interface{}, error)) ([]interface{}, error)

ForReplicationSet runs f, in parallel, for all ingesters in the input replication set.

func (*Distributor) GetIngestersForMetadata added in v1.5.0

func (d *Distributor) GetIngestersForMetadata(ctx context.Context) (ring.ReplicationSet, error)

GetIngestersForMetadata returns a replication set including all ingesters that should be queried to fetch metadata (eg. label names/values or series).

func (*Distributor) GetIngestersForQuery added in v1.5.0

func (d *Distributor) GetIngestersForQuery(ctx context.Context, matchers ...*labels.Matcher) (ring.ReplicationSet, error)

GetIngestersForQuery returns a replication set including all ingesters that should be queried to fetch series matching input label matchers.

func (*Distributor) LabelNames

func (d *Distributor) LabelNames(ctx context.Context, from, to model.Time) ([]string, error)

LabelNames returns all the label names.

func (*Distributor) LabelNamesCommon added in v1.13.0

func (d *Distributor) LabelNamesCommon(ctx context.Context, from, to model.Time, f func(ctx context.Context, rs ring.ReplicationSet, req *ingester_client.LabelNamesRequest) ([]interface{}, error)) ([]string, error)

func (*Distributor) LabelNamesStream added in v1.13.0

func (d *Distributor) LabelNamesStream(ctx context.Context, from, to model.Time) ([]string, error)

func (*Distributor) LabelValuesForLabelName

func (d *Distributor) LabelValuesForLabelName(ctx context.Context, from, to model.Time, labelName model.LabelName, matchers ...*labels.Matcher) ([]string, error)

LabelValuesForLabelName returns all the label values that are associated with a given label name.

func (*Distributor) LabelValuesForLabelNameCommon added in v1.13.0

func (d *Distributor) LabelValuesForLabelNameCommon(ctx context.Context, from, to model.Time, labelName model.LabelName, f func(ctx context.Context, rs ring.ReplicationSet, req *ingester_client.LabelValuesRequest) ([]interface{}, error), matchers ...*labels.Matcher) ([]string, error)

func (*Distributor) LabelValuesForLabelNameStream added in v1.13.0

func (d *Distributor) LabelValuesForLabelNameStream(ctx context.Context, from, to model.Time, labelName model.LabelName, matchers ...*labels.Matcher) ([]string, error)

LabelValuesForLabelNameStream returns all the label values that are associated with a given label name.

func (*Distributor) MetricsForLabelMatchers

func (d *Distributor) MetricsForLabelMatchers(ctx context.Context, from, through model.Time, matchers ...*labels.Matcher) ([]metric.Metric, error)

MetricsForLabelMatchers gets the metrics that match said matchers

func (*Distributor) MetricsForLabelMatchersStream added in v1.13.0

func (d *Distributor) MetricsForLabelMatchersStream(ctx context.Context, from, through model.Time, matchers ...*labels.Matcher) ([]metric.Metric, error)

func (*Distributor) MetricsMetadata added in v1.1.0

func (d *Distributor) MetricsMetadata(ctx context.Context) ([]scrape.MetricMetadata, error)

MetricsMetadata returns all metric metadata of a user.

func (*Distributor) Push

Push implements client.IngesterServer

func (*Distributor) Query

func (d *Distributor) Query(ctx context.Context, from, to model.Time, matchers ...*labels.Matcher) (model.Matrix, error)

Query multiple ingesters and returns a Matrix of samples.

func (*Distributor) QueryExemplars added in v1.10.0

func (d *Distributor) QueryExemplars(ctx context.Context, from, to model.Time, matchers ...[]*labels.Matcher) (*ingester_client.ExemplarQueryResponse, error)

func (*Distributor) QueryStream

func (d *Distributor) QueryStream(ctx context.Context, from, to model.Time, matchers ...*labels.Matcher) (*ingester_client.QueryStreamResponse, error)

QueryStream multiple ingesters via the streaming interface and returns big ol' set of chunks.

func (*Distributor) ServeHTTP added in v1.10.0

func (d *Distributor) ServeHTTP(w http.ResponseWriter, req *http.Request)

func (*Distributor) UserStats

func (d *Distributor) UserStats(ctx context.Context) (*UserStats, error)

UserStats returns statistics about the current user.

func (*Distributor) UserStatsHandler

func (d *Distributor) UserStatsHandler(w http.ResponseWriter, r *http.Request)

UserStatsHandler handles user stats to the Distributor.

type HATrackerConfig

type HATrackerConfig struct {
	EnableHATracker bool `yaml:"enable_ha_tracker"`
	// We should only update the timestamp if the difference
	// between the stored timestamp and the time we received a sample at
	// is more than this duration.
	UpdateTimeout          time.Duration `yaml:"ha_tracker_update_timeout"`
	UpdateTimeoutJitterMax time.Duration `yaml:"ha_tracker_update_timeout_jitter_max"`
	// We should only failover to accepting samples from a replica
	// other than the replica written in the KVStore if the difference
	// between the stored timestamp and the time we received a sample is
	// more than this duration
	FailoverTimeout time.Duration `yaml:"ha_tracker_failover_timeout"`

	KVStore kv.Config `` /* 190-byte string literal not displayed */
}

func (*HATrackerConfig) RegisterFlags

func (cfg *HATrackerConfig) RegisterFlags(f *flag.FlagSet)

RegisterFlags adds the flags required to config this to the given FlagSet.

func (*HATrackerConfig) ToHATrackerConfig added in v1.16.0

func (cfg *HATrackerConfig) ToHATrackerConfig() ha.HATrackerConfig

type InstanceLimits added in v1.9.0

type InstanceLimits struct {
	MaxIngestionRate        float64 `yaml:"max_ingestion_rate"`
	MaxInflightPushRequests int     `yaml:"max_inflight_push_requests"`
}

type PoolConfig added in v1.1.0

type PoolConfig struct {
	ClientCleanupPeriod  time.Duration `yaml:"client_cleanup_period"`
	HealthCheckIngesters bool          `yaml:"health_check_ingesters"`
	RemoteTimeout        time.Duration `yaml:"-"`
}

PoolConfig is config for creating a Pool.

func (*PoolConfig) RegisterFlags added in v1.1.0

func (cfg *PoolConfig) RegisterFlags(f *flag.FlagSet)

RegisterFlags adds the flags required to config this to the given FlagSet.

type ReadLifecycler added in v0.6.0

type ReadLifecycler interface {
	HealthyInstancesCount() int
}

ReadLifecycler represents the read interface to the lifecycler.

type RingConfig added in v0.6.0

type RingConfig struct {
	KVStore          kv.Config     `yaml:"kvstore"`
	HeartbeatPeriod  time.Duration `yaml:"heartbeat_period"`
	HeartbeatTimeout time.Duration `yaml:"heartbeat_timeout"`

	// Instance details
	InstanceID             string   `yaml:"instance_id" doc:"hidden"`
	InstanceInterfaceNames []string `yaml:"instance_interface_names"`
	InstancePort           int      `yaml:"instance_port" doc:"hidden"`
	InstanceAddr           string   `yaml:"instance_addr" doc:"hidden"`

	// Injected internally
	ListenPort int `yaml:"-"`
}

RingConfig masks the ring lifecycler config which contains many options not really required by the distributors ring. This config is used to strip down the config to the minimum, and avoid confusion to the user.

func (*RingConfig) RegisterFlags added in v0.6.0

func (cfg *RingConfig) RegisterFlags(f *flag.FlagSet)

RegisterFlags adds the flags required to config this to the given FlagSet

func (*RingConfig) ToLifecyclerConfig added in v0.6.0

func (cfg *RingConfig) ToLifecyclerConfig() ring.LifecyclerConfig

ToLifecyclerConfig returns a LifecyclerConfig based on the distributor ring config.

func (*RingConfig) ToRingConfig added in v1.10.0

func (cfg *RingConfig) ToRingConfig() ring.Config

type UserIDStats

type UserIDStats struct {
	UserID string `json:"userID"`
	UserStats
}

UserIDStats models ingestion statistics for one user, including the user ID

type UserStats

type UserStats struct {
	IngestionRate     float64 `json:"ingestionRate"`
	NumSeries         uint64  `json:"numSeries"`
	APIIngestionRate  float64 `json:"APIIngestionRate"`
	RuleIngestionRate float64 `json:"RuleIngestionRate"`
	ActiveSeries      uint64  `json:"activeSeries"`
}

UserStats models ingestion statistics for one user.

Directories

Path Synopsis

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL