Documentation ¶
Index ¶
Constants ¶
This section is empty.
Variables ¶
Functions ¶
func IngesterPartitionID ¶
IngesterPartitionID returns the partition ID owner the the given ingester.
Types ¶
type Config ¶
type Config struct { Enabled bool `yaml:"enabled"` KafkaConfig KafkaConfig `yaml:"kafka"` Migration MigrationConfig `yaml:"migration"` }
func (*Config) RegisterFlags ¶
type KafkaConfig ¶
type KafkaConfig struct { Address string `yaml:"address"` Topic string `yaml:"topic"` ClientID string `yaml:"client_id"` DialTimeout time.Duration `yaml:"dial_timeout"` WriteTimeout time.Duration `yaml:"write_timeout"` LastProducedOffsetPollInterval time.Duration `yaml:"last_produced_offset_poll_interval"` LastProducedOffsetRetryTimeout time.Duration `yaml:"last_produced_offset_retry_timeout"` ConsumeFromPositionAtStartup string `yaml:"consume_from_position_at_startup"` ConsumeFromTimestampAtStartup int64 `yaml:"consume_from_timestamp_at_startup"` MaxConsumerLagAtStartup time.Duration `yaml:"max_consumer_lag_at_startup"` AutoCreateTopicEnabled bool `yaml:"auto_create_topic_enabled"` AutoCreateTopicDefaultPartitions int `yaml:"auto_create_topic_default_partitions"` }
KafkaConfig holds the generic config for the Kafka backend.
func (*KafkaConfig) RegisterFlags ¶
func (cfg *KafkaConfig) RegisterFlags(f *flag.FlagSet)
func (*KafkaConfig) RegisterFlagsWithPrefix ¶
func (cfg *KafkaConfig) RegisterFlagsWithPrefix(prefix string, f *flag.FlagSet)
func (*KafkaConfig) Validate ¶
func (cfg *KafkaConfig) Validate() error
type MigrationConfig ¶
type MigrationConfig struct {
DistributorSendToIngestersEnabled bool `yaml:"distributor_send_to_ingesters_enabled"`
}
MigrationConfig holds the configuration used to migrate Mimir to ingest storage. This config shouldn't be set for any other reason.
func (*MigrationConfig) RegisterFlags ¶
func (cfg *MigrationConfig) RegisterFlags(f *flag.FlagSet)
func (*MigrationConfig) RegisterFlagsWithPrefix ¶
func (cfg *MigrationConfig) RegisterFlagsWithPrefix(prefix string, f *flag.FlagSet)
type PartitionReader ¶
func NewPartitionReaderForPusher ¶
func NewPartitionReaderForPusher(kafkaCfg KafkaConfig, partitionID int32, consumerGroup string, pusher Pusher, logger log.Logger, reg prometheus.Registerer) (*PartitionReader, error)
func (*PartitionReader) WaitReadConsistency ¶
func (r *PartitionReader) WaitReadConsistency(ctx context.Context) (returnErr error)
WaitReadConsistency waits until all data produced up until now has been consumed by the reader.
type Writer ¶
Writer is responsible to write incoming data to the ingest storage.
func NewWriter ¶
func NewWriter(kafkaCfg KafkaConfig, logger log.Logger, reg prometheus.Registerer) *Writer
Click to show internal directories.
Click to hide internal directories.