distributor

package
v2.0.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jun 14, 2023 License: AGPL-3.0 Imports: 46 Imported by: 0

Documentation

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

func NewRateStore

func NewRateStore(cfg RateStoreConfig, r ring.ReadRing, cf poolClientFactory, l Limits, registerer prometheus.Registerer) *rateStore

Types

type Config

type Config struct {
	// Distributors ring
	DistributorRing RingConfig `yaml:"ring,omitempty"`

	// RateStore customizes the rate storing used by stream sharding.
	RateStore RateStoreConfig `yaml:"rate_store"`

	// WriteFailuresLoggingCfg customizes write failures logging behavior.
	WriteFailuresLogging writefailures.Cfg `yaml:"write_failures_logging" doc:"description=Experimental. Customize the logging of write failures."`
	// contains filtered or unexported fields
}

Config for a Distributor.

func (*Config) RegisterFlags

func (cfg *Config) RegisterFlags(fs *flag.FlagSet)

RegisterFlags registers distributor-related flags.

type Distributor

type Distributor struct {
	services.Service
	// contains filtered or unexported fields
}

Distributor coordinates replicates and distribution of log streams.

func New

func New(
	cfg Config,
	clientCfg client.Config,
	configs *runtime.TenantConfigs,
	ingestersRing ring.ReadRing,
	overrides Limits,
	registerer prometheus.Registerer,
) (*Distributor, error)

New a distributor creates.

func (*Distributor) HealthyInstancesCount

func (d *Distributor) HealthyInstancesCount() int

HealthyInstancesCount implements the ReadLifecycler interface.

We use a ring lifecycler delegate to count the number of members of the ring. The count is then used to enforce rate limiting correctly for each distributor. $EFFECTIVE_RATE_LIMIT = $GLOBAL_RATE_LIMIT / $NUM_INSTANCES.

func (*Distributor) Push

Push a set of streams. The returned error is the last one seen.

func (*Distributor) PushHandler

func (d *Distributor) PushHandler(w http.ResponseWriter, r *http.Request)

PushHandler reads a snappy-compressed proto from the HTTP body.

func (*Distributor) ServeHTTP

func (d *Distributor) ServeHTTP(w http.ResponseWriter, r *http.Request)

ServeHTTP implements the distributor ring status page.

If the rate limiting strategy is local instead of global, no ring is used by the distributor and as such, no ring status is returned from this function.

type Limits

type Limits interface {
	retention.Limits
	MaxLineSize(userID string) int
	MaxLineSizeTruncate(userID string) bool
	EnforceMetricName(userID string) bool
	MaxLabelNamesPerSeries(userID string) int
	MaxLabelNameLength(userID string) int
	MaxLabelValueLength(userID string) int

	CreationGracePeriod(userID string) time.Duration
	RejectOldSamples(userID string) bool
	RejectOldSamplesMaxAge(userID string) time.Duration

	IncrementDuplicateTimestamps(userID string) bool

	ShardStreams(userID string) *shardstreams.Config
	IngestionRateStrategy() string
	IngestionRateBytes(userID string) float64
	IngestionBurstSizeBytes(userID string) int
}

Limits is an interface for distributor limits/related configs

type RateStore

type RateStore interface {
	RateFor(tenantID string, streamHash uint64) (int64, float64)
}

RateStore manages the ingestion rate of streams, populated by data fetched from ingesters.

type RateStoreConfig

type RateStoreConfig struct {
	MaxParallelism           int           `yaml:"max_request_parallelism"`
	StreamRateUpdateInterval time.Duration `yaml:"stream_rate_update_interval"`
	IngesterReqTimeout       time.Duration `yaml:"ingester_request_timeout"`
	Debug                    bool          `yaml:"debug"`
}

func (*RateStoreConfig) RegisterFlagsWithPrefix

func (cfg *RateStoreConfig) RegisterFlagsWithPrefix(prefix string, fs *flag.FlagSet)

type ReadLifecycler

type ReadLifecycler interface {
	HealthyInstancesCount() int
}

ReadLifecycler represents the read interface to the lifecycler.

type RingConfig

type RingConfig struct {
	KVStore          kv.Config     `yaml:"kvstore"`
	HeartbeatPeriod  time.Duration `yaml:"heartbeat_period"`
	HeartbeatTimeout time.Duration `yaml:"heartbeat_timeout"`

	// Instance details
	InstanceID             string   `yaml:"instance_id" doc:"hidden"`
	InstanceInterfaceNames []string `yaml:"instance_interface_names" doc:"default=[<private network interfaces>]"`
	InstancePort           int      `yaml:"instance_port" doc:"hidden"`
	InstanceAddr           string   `yaml:"instance_addr" doc:"hidden"`
	EnableIPv6             bool     `yaml:"instance_enable_ipv6" doc:"hidden"`

	// Injected internally
	ListenPort int `yaml:"-"`
}

RingConfig masks the ring lifecycler config which contains many options not really required by the distributors ring. This config is used to strip down the config to the minimum, and avoid confusion to the user.

func (*RingConfig) RegisterFlags

func (cfg *RingConfig) RegisterFlags(f *flag.FlagSet)

RegisterFlags adds the flags required to config this to the given FlagSet

func (*RingConfig) ToBasicLifecyclerConfig

func (cfg *RingConfig) ToBasicLifecyclerConfig(logger log.Logger) (ring.BasicLifecyclerConfig, error)

ToBasicLifecyclerConfig returns a BasicLifecyclerConfig based on the distributor ring config.

func (*RingConfig) ToRingConfig

func (cfg *RingConfig) ToRingConfig() ring.Config

type ShardTracker

type ShardTracker struct {
	// contains filtered or unexported fields
}

ShardTracker is a data structure to keep track of the last pushed shard number for a given stream hash. This allows the distributor to evenly shard streams across pushes even when any given push has fewer entries than the calculated number of shards

func NewShardTracker

func NewShardTracker() *ShardTracker

func (*ShardTracker) LastShardNum

func (t *ShardTracker) LastShardNum(tenant string, streamHash uint64) int

func (*ShardTracker) SetLastShardNum

func (t *ShardTracker) SetLastShardNum(tenant string, streamHash uint64, shardNum int)

type Validator

type Validator struct {
	Limits
}

func NewValidator

func NewValidator(l Limits) (*Validator, error)

func (Validator) ValidateEntry

func (v Validator) ValidateEntry(ctx validationContext, labels string, entry logproto.Entry) error

ValidateEntry returns an error if the entry is invalid and report metrics for invalid entries accordingly.

func (Validator) ValidateLabels

func (v Validator) ValidateLabels(ctx validationContext, ls labels.Labels, stream logproto.Stream) error

Validate labels returns an error if the labels are invalid

Directories

Path Synopsis

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL