Version: v1.9.0 Latest Latest

This package is not in the latest version of its module.

Go to latest
Published: May 14, 2021 License: Apache-2.0 Imports: 52 Imported by: 9




This section is empty.


View Source
var (
	ErrInvalidLengthHaTracker = fmt.Errorf("proto: negative length found during unmarshaling")
	ErrIntOverflowHaTracker   = fmt.Errorf("proto: integer overflow")


func GetReplicaDescCodec

func GetReplicaDescCodec() codec.Proto

func NewPool

func NewPool(cfg PoolConfig, ring ring.ReadRing, factory ring_client.PoolFactory, logger log.Logger) *ring_client.Pool

func ProtoReplicaDescFactory

func ProtoReplicaDescFactory() proto.Message

ProtoReplicaDescFactory makes new InstanceDescs


type Config

type Config struct {
	PoolConfig PoolConfig `yaml:"pool"`

	HATrackerConfig HATrackerConfig `yaml:"ha_tracker"`

	MaxRecvMsgSize  int           `yaml:"max_recv_msg_size"`
	RemoteTimeout   time.Duration `yaml:"remote_timeout"`
	ExtraQueryDelay time.Duration `yaml:"extra_queue_delay"`

	ShardingStrategy string `yaml:"sharding_strategy"`
	ShardByAllLabels bool   `yaml:"shard_by_all_labels"`
	ExtendWrites     bool   `yaml:"extend_writes"`

	// Distributors ring
	DistributorRing RingConfig `yaml:"ring"`

	// for testing and for extending the ingester by adding calls to the client
	IngesterClientFactory ring_client.PoolFactory `yaml:"-"`

	// when true the distributor does not validate the label name, Cortex doesn't directly use
	// this (and should never use it) but this feature is used by other projects built on top of it
	SkipLabelNameValidation bool `yaml:"-"`

	// This config is dynamically injected because defined in the querier config.
	ShuffleShardingLookbackPeriod time.Duration `yaml:"-"`

	// Limits for distributor
	InstanceLimits InstanceLimits `yaml:"instance_limits"`

Config contains the configuration required to create a Distributor

func (*Config) RegisterFlags

func (cfg *Config) RegisterFlags(f *flag.FlagSet)

RegisterFlags adds the flags required to config this to the given FlagSet

func (*Config) Validate

func (cfg *Config) Validate(limits validation.Limits) error

Validate config and returns error on failure

type Distributor

type Distributor struct {

	// For handling HA replicas.
	HATracker *haTracker
	// contains filtered or unexported fields

Distributor is a storage.SampleAppender and a client.Querier which forwards appends and queries to individual ingesters.

func New

func New(cfg Config, clientConfig ingester_client.Config, limits *validation.Overrides, ingestersRing ring.ReadRing, canJoinDistributorsRing bool, reg prometheus.Registerer, log log.Logger) (*Distributor, error)

New constructs a new Distributor

func (*Distributor) AllUserStats

func (d *Distributor) AllUserStats(ctx context.Context) ([]UserIDStats, error)

AllUserStats returns statistics about all users. Note it does not divide by the ReplicationFactor like UserStats()

func (*Distributor) AllUserStatsHandler

func (d *Distributor) AllUserStatsHandler(w http.ResponseWriter, r *http.Request)

AllUserStatsHandler shows stats for all users.

func (*Distributor) ForReplicationSet

func (d *Distributor) ForReplicationSet(ctx context.Context, replicationSet ring.ReplicationSet, f func(context.Context, ingester_client.IngesterClient) (interface{}, error)) ([]interface{}, error)

ForReplicationSet runs f, in parallel, for all ingesters in the input replication set.

func (*Distributor) GetIngestersForMetadata

func (d *Distributor) GetIngestersForMetadata(ctx context.Context) (ring.ReplicationSet, error)

GetIngestersForMetadata returns a replication set including all ingesters that should be queried to fetch metadata (eg. label names/values or series).

func (*Distributor) GetIngestersForQuery

func (d *Distributor) GetIngestersForQuery(ctx context.Context, matchers ...*labels.Matcher) (ring.ReplicationSet, error)

GetIngestersForQuery returns a replication set including all ingesters that should be queried to fetch series matching input label matchers.

func (*Distributor) LabelNames

func (d *Distributor) LabelNames(ctx context.Context, from, to model.Time) ([]string, error)

LabelNames returns all of the label names.

func (*Distributor) LabelValuesForLabelName

func (d *Distributor) LabelValuesForLabelName(ctx context.Context, from, to model.Time, labelName model.LabelName, matchers ...*labels.Matcher) ([]string, error)

LabelValuesForLabelName returns all of the label values that are associated with a given label name.

func (*Distributor) MetricsForLabelMatchers

func (d *Distributor) MetricsForLabelMatchers(ctx context.Context, from, through model.Time, matchers ...*labels.Matcher) ([]metric.Metric, error)

MetricsForLabelMatchers gets the metrics that match said matchers

func (*Distributor) MetricsMetadata

func (d *Distributor) MetricsMetadata(ctx context.Context) ([]scrape.MetricMetadata, error)

MetricsMetadata returns all metric metadata of a user.

func (*Distributor) Push

Push implements client.IngesterServer

func (*Distributor) Query

func (d *Distributor) Query(ctx context.Context, from, to model.Time, matchers ...*labels.Matcher) (model.Matrix, error)

Query multiple ingesters and returns a Matrix of samples.

func (*Distributor) QueryStream

func (d *Distributor) QueryStream(ctx context.Context, from, to model.Time, matchers ...*labels.Matcher) (*ingester_client.QueryStreamResponse, error)

QueryStream multiple ingesters via the streaming interface and returns big ol' set of chunks.

func (*Distributor) UserStats

func (d *Distributor) UserStats(ctx context.Context) (*UserStats, error)

UserStats returns statistics about the current user.

func (*Distributor) UserStatsHandler

func (d *Distributor) UserStatsHandler(w http.ResponseWriter, r *http.Request)

UserStatsHandler handles user stats to the Distributor.

type HATrackerConfig

type HATrackerConfig struct {
	EnableHATracker bool `yaml:"enable_ha_tracker"`
	// We should only update the timestamp if the difference
	// between the stored timestamp and the time we received a sample at
	// is more than this duration.
	UpdateTimeout          time.Duration `yaml:"ha_tracker_update_timeout"`
	UpdateTimeoutJitterMax time.Duration `yaml:"ha_tracker_update_timeout_jitter_max"`
	// We should only failover to accepting samples from a replica
	// other than the replica written in the KVStore if the difference
	// between the stored timestamp and the time we received a sample is
	// more than this duration
	FailoverTimeout time.Duration `yaml:"ha_tracker_failover_timeout"`

	KVStore kv.Config `` /* 190-byte string literal not displayed */

HATrackerConfig contains the configuration require to create a HA Tracker.

func (*HATrackerConfig) RegisterFlags

func (cfg *HATrackerConfig) RegisterFlags(f *flag.FlagSet)

RegisterFlags adds the flags required to config this to the given FlagSet.

func (*HATrackerConfig) Validate

func (cfg *HATrackerConfig) Validate() error

Validate config and returns error on failure

type InstanceLimits

type InstanceLimits struct {
	MaxIngestionRate        float64 `yaml:"max_ingestion_rate"`
	MaxInflightPushRequests int     `yaml:"max_inflight_push_requests"`

type PoolConfig

type PoolConfig struct {
	ClientCleanupPeriod  time.Duration `yaml:"client_cleanup_period"`
	HealthCheckIngesters bool          `yaml:"health_check_ingesters"`
	RemoteTimeout        time.Duration `yaml:"-"`

PoolConfig is config for creating a Pool.

func (*PoolConfig) RegisterFlags

func (cfg *PoolConfig) RegisterFlags(f *flag.FlagSet)

RegisterFlags adds the flags required to config this to the given FlagSet.

type ReadLifecycler

type ReadLifecycler interface {
	HealthyInstancesCount() int

ReadLifecycler represents the read interface to the lifecycler.

type ReplicaDesc

type ReplicaDesc struct {
	Replica    string `protobuf:"bytes,1,opt,name=replica,proto3" json:"replica,omitempty"`
	ReceivedAt int64  `protobuf:"varint,2,opt,name=received_at,json=receivedAt,proto3" json:"received_at,omitempty"`
	// Unix timestamp in millseconds when this entry was marked for deletion.
	// Reason for doing marking first, and delete later, is to make sure that distributors
	// watching the prefix will receive notification on "marking" -- at which point they can
	// already remove entry from memory. Actual deletion from KV store does *not* trigger
	// "watch" notification with a key for all KV stores.
	DeletedAt int64 `protobuf:"varint,3,opt,name=deleted_at,json=deletedAt,proto3" json:"deleted_at,omitempty"`

func NewReplicaDesc

func NewReplicaDesc() *ReplicaDesc

NewReplicaDesc returns an empty *distributor.ReplicaDesc.

func (*ReplicaDesc) Descriptor

func (*ReplicaDesc) Descriptor() ([]byte, []int)

func (*ReplicaDesc) Equal

func (this *ReplicaDesc) Equal(that interface{}) bool

func (*ReplicaDesc) GetDeletedAt

func (m *ReplicaDesc) GetDeletedAt() int64

func (*ReplicaDesc) GetReceivedAt

func (m *ReplicaDesc) GetReceivedAt() int64

func (*ReplicaDesc) GetReplica

func (m *ReplicaDesc) GetReplica() string

func (*ReplicaDesc) GoString

func (this *ReplicaDesc) GoString() string

func (*ReplicaDesc) Marshal

func (m *ReplicaDesc) Marshal() (dAtA []byte, err error)

func (*ReplicaDesc) MarshalTo

func (m *ReplicaDesc) MarshalTo(dAtA []byte) (int, error)

func (*ReplicaDesc) MarshalToSizedBuffer

func (m *ReplicaDesc) MarshalToSizedBuffer(dAtA []byte) (int, error)

func (*ReplicaDesc) ProtoMessage

func (*ReplicaDesc) ProtoMessage()

func (*ReplicaDesc) Reset

func (m *ReplicaDesc) Reset()

func (*ReplicaDesc) Size

func (m *ReplicaDesc) Size() (n int)

func (*ReplicaDesc) String

func (this *ReplicaDesc) String() string

func (*ReplicaDesc) Unmarshal

func (m *ReplicaDesc) Unmarshal(dAtA []byte) error

func (*ReplicaDesc) XXX_DiscardUnknown

func (m *ReplicaDesc) XXX_DiscardUnknown()

func (*ReplicaDesc) XXX_Marshal

func (m *ReplicaDesc) XXX_Marshal(b []byte, deterministic bool) ([]byte, error)

func (*ReplicaDesc) XXX_Merge

func (m *ReplicaDesc) XXX_Merge(src proto.Message)

func (*ReplicaDesc) XXX_Size

func (m *ReplicaDesc) XXX_Size() int

func (*ReplicaDesc) XXX_Unmarshal

func (m *ReplicaDesc) XXX_Unmarshal(b []byte) error

type RingConfig

type RingConfig struct {
	KVStore          kv.Config     `yaml:"kvstore"`
	HeartbeatPeriod  time.Duration `yaml:"heartbeat_period"`
	HeartbeatTimeout time.Duration `yaml:"heartbeat_timeout"`

	// Instance details
	InstanceID             string   `yaml:"instance_id" doc:"hidden"`
	InstanceInterfaceNames []string `yaml:"instance_interface_names"`
	InstancePort           int      `yaml:"instance_port" doc:"hidden"`
	InstanceAddr           string   `yaml:"instance_addr" doc:"hidden"`

	// Injected internally
	ListenPort int `yaml:"-"`

RingConfig masks the ring lifecycler config which contains many options not really required by the distributors ring. This config is used to strip down the config to the minimum, and avoid confusion to the user.

func (*RingConfig) RegisterFlags

func (cfg *RingConfig) RegisterFlags(f *flag.FlagSet)

RegisterFlags adds the flags required to config this to the given FlagSet

func (*RingConfig) ToLifecyclerConfig

func (cfg *RingConfig) ToLifecyclerConfig() ring.LifecyclerConfig

ToLifecyclerConfig returns a LifecyclerConfig based on the distributor ring config.

type UserIDStats

type UserIDStats struct {
	UserID string `json:"userID"`

UserIDStats models ingestion statistics for one user, including the user ID

type UserStats

type UserStats struct {
	IngestionRate     float64 `json:"ingestionRate"`
	NumSeries         uint64  `json:"numSeries"`
	APIIngestionRate  float64 `json:"APIIngestionRate"`
	RuleIngestionRate float64 `json:"RuleIngestionRate"`

UserStats models ingestion statistics for one user.


Path Synopsis

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to