ingest

package
v0.0.0-...-2032768 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Apr 30, 2024 License: AGPL-3.0 Imports: 31 Imported by: 0

Documentation

Index

Constants

This section is empty.

Variables

View Source
var (
	ErrMissingKafkaAddress    = errors.New("the Kafka address has not been configured")
	ErrMissingKafkaTopic      = errors.New("the Kafka topic has not been configured")
	ErrInvalidConsumePosition = errors.New("the configured consume position is invalid")
)

Functions

func IngesterPartitionID

func IngesterPartitionID(ingesterID string) (int32, error)

IngesterPartitionID returns the partition ID owner the the given ingester.

Types

type Config

type Config struct {
	Enabled     bool            `yaml:"enabled"`
	KafkaConfig KafkaConfig     `yaml:"kafka"`
	Migration   MigrationConfig `yaml:"migration"`
}

func (*Config) RegisterFlags

func (cfg *Config) RegisterFlags(f *flag.FlagSet)

func (*Config) Validate

func (cfg *Config) Validate() error

Validate the config.

type KafkaConfig

type KafkaConfig struct {
	Address      string        `yaml:"address"`
	Topic        string        `yaml:"topic"`
	ClientID     string        `yaml:"client_id"`
	DialTimeout  time.Duration `yaml:"dial_timeout"`
	WriteTimeout time.Duration `yaml:"write_timeout"`

	LastProducedOffsetPollInterval time.Duration `yaml:"last_produced_offset_poll_interval"`
	LastProducedOffsetRetryTimeout time.Duration `yaml:"last_produced_offset_retry_timeout"`

	ConsumeFromPositionAtStartup  string        `yaml:"consume_from_position_at_startup"`
	ConsumeFromTimestampAtStartup int64         `yaml:"consume_from_timestamp_at_startup"`
	MaxConsumerLagAtStartup       time.Duration `yaml:"max_consumer_lag_at_startup"`

	AutoCreateTopicEnabled           bool `yaml:"auto_create_topic_enabled"`
	AutoCreateTopicDefaultPartitions int  `yaml:"auto_create_topic_default_partitions"`
}

KafkaConfig holds the generic config for the Kafka backend.

func (*KafkaConfig) RegisterFlags

func (cfg *KafkaConfig) RegisterFlags(f *flag.FlagSet)

func (*KafkaConfig) RegisterFlagsWithPrefix

func (cfg *KafkaConfig) RegisterFlagsWithPrefix(prefix string, f *flag.FlagSet)

func (*KafkaConfig) Validate

func (cfg *KafkaConfig) Validate() error

type MigrationConfig

type MigrationConfig struct {
	DistributorSendToIngestersEnabled bool `yaml:"distributor_send_to_ingesters_enabled"`
}

MigrationConfig holds the configuration used to migrate Mimir to ingest storage. This config shouldn't be set for any other reason.

func (*MigrationConfig) RegisterFlags

func (cfg *MigrationConfig) RegisterFlags(f *flag.FlagSet)

func (*MigrationConfig) RegisterFlagsWithPrefix

func (cfg *MigrationConfig) RegisterFlagsWithPrefix(prefix string, f *flag.FlagSet)

type PartitionReader

type PartitionReader struct {
	services.Service
	// contains filtered or unexported fields
}

func NewPartitionReaderForPusher

func NewPartitionReaderForPusher(kafkaCfg KafkaConfig, partitionID int32, consumerGroup string, pusher Pusher, logger log.Logger, reg prometheus.Registerer) (*PartitionReader, error)

func (*PartitionReader) WaitReadConsistency

func (r *PartitionReader) WaitReadConsistency(ctx context.Context) (returnErr error)

WaitReadConsistency waits until all data produced up until now has been consumed by the reader.

type Pusher

type Pusher interface {
	PushToStorage(context.Context, *mimirpb.WriteRequest) error
}

type Writer

type Writer struct {
	services.Service
	// contains filtered or unexported fields
}

Writer is responsible to write incoming data to the ingest storage.

func NewWriter

func NewWriter(kafkaCfg KafkaConfig, logger log.Logger, reg prometheus.Registerer) *Writer

func (*Writer) WriteSync

func (w *Writer) WriteSync(ctx context.Context, partitionID int32, userID string, req *mimirpb.WriteRequest) error

WriteSync the input data to the ingest storage. The function blocks until the data has been successfully committed, or an error occurred.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL