ingester

package
v0.0.0-...-cae4bf4 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Oct 24, 2020 License: Apache-2.0 Imports: 61 Imported by: 0

Documentation

Index

Constants

This section is empty.

Variables

View Source
var (
	ErrInvalidLengthWal = fmt.Errorf("proto: negative length found during unmarshaling")
	ErrIntOverflowWal   = fmt.Errorf("proto: integer overflow")
)

Functions

func SegmentRange

func SegmentRange(dir string) (int, int, error)

SegmentRange returns the first and last segment index of the WAL in the dir. If https://github.com/prometheus/prometheus/pull/6477 is merged, get rid of this method and use from Prometheus directly.

Types

type ChunkStore

type ChunkStore interface {
	Put(ctx context.Context, chunks []cortex_chunk.Chunk) error
}

ChunkStore is the interface we need to store chunks

type Config

type Config struct {
	WALConfig        WALConfig             `yaml:"walconfig"`
	LifecyclerConfig ring.LifecyclerConfig `yaml:"lifecycler"`

	// Config for transferring chunks. Zero or negative = no retries.
	MaxTransferRetries int `yaml:"max_transfer_retries"`

	// Config for chunk flushing.
	FlushCheckPeriod  time.Duration `yaml:"flush_period"`
	RetainPeriod      time.Duration `yaml:"retain_period"`
	MaxChunkIdle      time.Duration `yaml:"max_chunk_idle_time"`
	MaxStaleChunkIdle time.Duration `yaml:"max_stale_chunk_idle_time"`
	FlushOpTimeout    time.Duration `yaml:"flush_op_timeout"`
	MaxChunkAge       time.Duration `yaml:"max_chunk_age"`
	ChunkAgeJitter    time.Duration `yaml:"chunk_age_jitter"`
	ConcurrentFlushes int           `yaml:"concurrent_flushes"`
	SpreadFlushes     bool          `yaml:"spread_flushes"`

	// Config for metadata purging.
	MetadataRetainPeriod time.Duration `yaml:"metadata_retain_period"`

	RateUpdatePeriod time.Duration `yaml:"rate_update_period"`

	// Use blocks storage.
	BlocksStorageEnabled bool                     `yaml:"-"`
	BlocksStorageConfig  tsdb.BlocksStorageConfig `yaml:"-"`

	// Injected at runtime and read from the distributor config, required
	// to accurately apply global limits.
	ShardByAllLabels bool `yaml:"-"`
	// contains filtered or unexported fields
}

Config for an Ingester.

func (*Config) RegisterFlags

func (cfg *Config) RegisterFlags(f *flag.FlagSet)

RegisterFlags adds the flags required to config this to the given FlagSet

type Ingester

type Ingester struct {
	*services.BasicService

	// Prometheus block storage
	TSDBState TSDBState
	// contains filtered or unexported fields
}

Ingester deals with "in flight" chunks. Based on Prometheus 1.x MemorySeriesStorage.

func New

func New(cfg Config, clientConfig client.Config, limits *validation.Overrides, chunkStore ChunkStore, registerer prometheus.Registerer) (*Ingester, error)

New constructs a new Ingester.

func NewForFlusher

func NewForFlusher(cfg Config, chunkStore ChunkStore, registerer prometheus.Registerer) (*Ingester, error)

NewForFlusher constructs a new Ingester to be used by flusher target. Compared to the 'New' method:

  • Always replays the WAL.
  • Does not start the lifecycler.

func NewV2

func NewV2(cfg Config, clientConfig client.Config, limits *validation.Overrides, registerer prometheus.Registerer) (*Ingester, error)

NewV2 returns a new Ingester that uses Cortex block storage instead of chunks storage.

func NewV2ForFlusher

func NewV2ForFlusher(cfg Config, registerer prometheus.Registerer) (*Ingester, error)

Special version of ingester used by Flusher. This ingester is not ingesting anything, its only purpose is to react on Flush method and flush all openened TSDBs when called.

func (*Ingester) AllUserStats

AllUserStats returns ingestion statistics for all users known to this ingester.

func (*Ingester) CheckReady

func (i *Ingester) CheckReady(ctx context.Context) error

CheckReady is the readiness handler used to indicate to k8s when the ingesters are ready for the addition or removal of another ingester.

func (*Ingester) Flush

func (i *Ingester) Flush()

Flush triggers a flush of all the chunks and closes the flush queues. Called from the Lifecycler as part of the ingester shutdown.

func (*Ingester) FlushHandler

func (i *Ingester) FlushHandler(w http.ResponseWriter, r *http.Request)

FlushHandler triggers a flush of all in memory chunks. Mainly used for local testing.

func (*Ingester) LabelNames

LabelNames return all the label names.

func (*Ingester) LabelValues

LabelValues returns all label values that are associated with a given label name.

func (*Ingester) MetricsForLabelMatchers

MetricsForLabelMatchers returns all the metrics which match a set of matchers.

func (*Ingester) MetricsMetadata

MetricsMetadata returns all the metric metadata of a user.

func (*Ingester) Push

Push implements client.IngesterServer

func (*Ingester) Query

Query implements service.IngesterServer

func (*Ingester) QueryStream

func (i *Ingester) QueryStream(req *client.QueryRequest, stream client.Ingester_QueryStreamServer) error

QueryStream implements service.IngesterServer

func (*Ingester) ShutdownHandler

func (i *Ingester) ShutdownHandler(w http.ResponseWriter, r *http.Request)

ShutdownHandler triggers the following set of operations in order:

  • Change the state of ring to stop accepting writes.
  • Flush all the chunks.

func (*Ingester) TransferChunks

func (i *Ingester) TransferChunks(stream client.Ingester_TransferChunksServer) error

TransferChunks receives all the chunks from another ingester.

func (*Ingester) TransferOut

func (i *Ingester) TransferOut(ctx context.Context) error

TransferOut finds an ingester in PENDING state and transfers our chunks to it. Called as part of the ingester shutdown process.

func (*Ingester) TransferTSDB

func (i *Ingester) TransferTSDB(stream client.Ingester_TransferTSDBServer) error

TransferTSDB receives all the file chunks from another ingester, and writes them to tsdb directories

func (*Ingester) UserStats

UserStats returns ingestion statistics for the current user.

type Labels

type Labels struct {
	Fingerprint uint64                                                             `protobuf:"varint,1,opt,name=fingerprint,proto3" json:"fingerprint,omitempty"`
	Labels      []github_com_cortexproject_cortex_pkg_ingester_client.LabelAdapter `` /* 131-byte string literal not displayed */
}

func (*Labels) Descriptor

func (*Labels) Descriptor() ([]byte, []int)

func (*Labels) Equal

func (this *Labels) Equal(that interface{}) bool

func (*Labels) GetFingerprint

func (m *Labels) GetFingerprint() uint64

func (*Labels) GoString

func (this *Labels) GoString() string

func (*Labels) Marshal

func (m *Labels) Marshal() (dAtA []byte, err error)

func (*Labels) MarshalTo

func (m *Labels) MarshalTo(dAtA []byte) (int, error)

func (*Labels) MarshalToSizedBuffer

func (m *Labels) MarshalToSizedBuffer(dAtA []byte) (int, error)

func (*Labels) ProtoMessage

func (*Labels) ProtoMessage()

func (*Labels) Reset

func (m *Labels) Reset()

func (*Labels) Size

func (m *Labels) Size() (n int)

func (*Labels) String

func (this *Labels) String() string

func (*Labels) Unmarshal

func (m *Labels) Unmarshal(dAtA []byte) error

func (*Labels) XXX_DiscardUnknown

func (m *Labels) XXX_DiscardUnknown()

func (*Labels) XXX_Marshal

func (m *Labels) XXX_Marshal(b []byte, deterministic bool) ([]byte, error)

func (*Labels) XXX_Merge

func (m *Labels) XXX_Merge(src proto.Message)

func (*Labels) XXX_Size

func (m *Labels) XXX_Size() int

func (*Labels) XXX_Unmarshal

func (m *Labels) XXX_Unmarshal(b []byte) error

type Limiter

type Limiter struct {
	// contains filtered or unexported fields
}

Limiter implements primitives to get the maximum number of series an ingester can handle for a specific tenant

func NewLimiter

func NewLimiter(limits *validation.Overrides, ring RingCount, replicationFactor int, shardByAllLabels bool) *Limiter

NewLimiter makes a new in-memory series limiter

func (*Limiter) AssertMaxMetadataPerMetric

func (l *Limiter) AssertMaxMetadataPerMetric(userID string, metadata int) error

AssertMaxMetadataPerMetric limit has not been reached compared to the current number of metadata per metric in input and returns an error if so.

func (*Limiter) AssertMaxMetricsWithMetadataPerUser

func (l *Limiter) AssertMaxMetricsWithMetadataPerUser(userID string, metrics int) error

AssertMaxMetricsWithMetadataPerUser limit has not been reached compared to the current number of metrics with metadata in input and returns an error if so.

func (*Limiter) AssertMaxSeriesPerMetric

func (l *Limiter) AssertMaxSeriesPerMetric(userID string, series int) error

AssertMaxSeriesPerMetric limit has not been reached compared to the current number of series in input and returns an error if so.

func (*Limiter) AssertMaxSeriesPerUser

func (l *Limiter) AssertMaxSeriesPerUser(userID string, series int) error

AssertMaxSeriesPerUser limit has not been reached compared to the current number of series in input and returns an error if so.

func (*Limiter) MaxSeriesPerQuery

func (l *Limiter) MaxSeriesPerQuery(userID string) int

MaxSeriesPerQuery returns the maximum number of series a query is allowed to hit.

type Record

type Record struct {
	UserId  string   `protobuf:"bytes,1,opt,name=user_id,json=userId,proto3" json:"user_id,omitempty"`
	Labels  []Labels `protobuf:"bytes,2,rep,name=labels,proto3" json:"labels"`
	Samples []Sample `protobuf:"bytes,3,rep,name=samples,proto3" json:"samples"`
}

func (*Record) Descriptor

func (*Record) Descriptor() ([]byte, []int)

func (*Record) Equal

func (this *Record) Equal(that interface{}) bool

func (*Record) GetLabels

func (m *Record) GetLabels() []Labels

func (*Record) GetSamples

func (m *Record) GetSamples() []Sample

func (*Record) GetUserId

func (m *Record) GetUserId() string

func (*Record) GoString

func (this *Record) GoString() string

func (*Record) Marshal

func (m *Record) Marshal() (dAtA []byte, err error)

func (*Record) MarshalTo

func (m *Record) MarshalTo(dAtA []byte) (int, error)

func (*Record) MarshalToSizedBuffer

func (m *Record) MarshalToSizedBuffer(dAtA []byte) (int, error)

func (*Record) ProtoMessage

func (*Record) ProtoMessage()

func (*Record) Reset

func (m *Record) Reset()

func (*Record) Size

func (m *Record) Size() (n int)

func (*Record) String

func (this *Record) String() string

func (*Record) Unmarshal

func (m *Record) Unmarshal(dAtA []byte) error

func (*Record) XXX_DiscardUnknown

func (m *Record) XXX_DiscardUnknown()

func (*Record) XXX_Marshal

func (m *Record) XXX_Marshal(b []byte, deterministic bool) ([]byte, error)

func (*Record) XXX_Merge

func (m *Record) XXX_Merge(src proto.Message)

func (*Record) XXX_Size

func (m *Record) XXX_Size() int

func (*Record) XXX_Unmarshal

func (m *Record) XXX_Unmarshal(b []byte) error

type RecordType

type RecordType byte

RecordType represents the type of the WAL/Checkpoint record.

const (

	// WALRecordSeries is the type for the WAL record on Prometheus TSDB record for series.
	WALRecordSeries RecordType = 1
	// WALRecordSamples is the type for the WAL record based on Prometheus TSDB record for samples.
	WALRecordSamples RecordType = 2

	// CheckpointRecord is the type for the Checkpoint record based on protos.
	CheckpointRecord RecordType = 3
)

type RingCount

type RingCount interface {
	HealthyInstancesCount() int
}

RingCount is the interface exposed by a ring implementation which allows to count members

type Sample

type Sample struct {
	Fingerprint uint64  `protobuf:"varint,1,opt,name=fingerprint,proto3" json:"fingerprint,omitempty"`
	Timestamp   uint64  `protobuf:"varint,2,opt,name=timestamp,proto3" json:"timestamp,omitempty"`
	Value       float64 `protobuf:"fixed64,3,opt,name=value,proto3" json:"value,omitempty"`
}

func (*Sample) Descriptor

func (*Sample) Descriptor() ([]byte, []int)

func (*Sample) Equal

func (this *Sample) Equal(that interface{}) bool

func (*Sample) GetFingerprint

func (m *Sample) GetFingerprint() uint64

func (*Sample) GetTimestamp

func (m *Sample) GetTimestamp() uint64

func (*Sample) GetValue

func (m *Sample) GetValue() float64

func (*Sample) GoString

func (this *Sample) GoString() string

func (*Sample) Marshal

func (m *Sample) Marshal() (dAtA []byte, err error)

func (*Sample) MarshalTo

func (m *Sample) MarshalTo(dAtA []byte) (int, error)

func (*Sample) MarshalToSizedBuffer

func (m *Sample) MarshalToSizedBuffer(dAtA []byte) (int, error)

func (*Sample) ProtoMessage

func (*Sample) ProtoMessage()

func (*Sample) Reset

func (m *Sample) Reset()

func (*Sample) Size

func (m *Sample) Size() (n int)

func (*Sample) String

func (this *Sample) String() string

func (*Sample) Unmarshal

func (m *Sample) Unmarshal(dAtA []byte) error

func (*Sample) XXX_DiscardUnknown

func (m *Sample) XXX_DiscardUnknown()

func (*Sample) XXX_Marshal

func (m *Sample) XXX_Marshal(b []byte, deterministic bool) ([]byte, error)

func (*Sample) XXX_Merge

func (m *Sample) XXX_Merge(src proto.Message)

func (*Sample) XXX_Size

func (m *Sample) XXX_Size() int

func (*Sample) XXX_Unmarshal

func (m *Sample) XXX_Unmarshal(b []byte) error

type Series

type Series struct {
	UserId      string                                                             `protobuf:"bytes,1,opt,name=user_id,json=userId,proto3" json:"user_id,omitempty"`
	Fingerprint uint64                                                             `protobuf:"varint,2,opt,name=fingerprint,proto3" json:"fingerprint,omitempty"`
	Labels      []github_com_cortexproject_cortex_pkg_ingester_client.LabelAdapter `` /* 131-byte string literal not displayed */
	Chunks      []client.Chunk                                                     `protobuf:"bytes,4,rep,name=chunks,proto3" json:"chunks"`
}

func (*Series) Descriptor

func (*Series) Descriptor() ([]byte, []int)

func (*Series) Equal

func (this *Series) Equal(that interface{}) bool

func (*Series) GetChunks

func (m *Series) GetChunks() []client.Chunk

func (*Series) GetFingerprint

func (m *Series) GetFingerprint() uint64

func (*Series) GetUserId

func (m *Series) GetUserId() string

func (*Series) GoString

func (this *Series) GoString() string

func (*Series) Marshal

func (m *Series) Marshal() (dAtA []byte, err error)

func (*Series) MarshalTo

func (m *Series) MarshalTo(dAtA []byte) (int, error)

func (*Series) MarshalToSizedBuffer

func (m *Series) MarshalToSizedBuffer(dAtA []byte) (int, error)

func (*Series) ProtoMessage

func (*Series) ProtoMessage()

func (*Series) Reset

func (m *Series) Reset()

func (*Series) Size

func (m *Series) Size() (n int)

func (*Series) String

func (this *Series) String() string

func (*Series) Unmarshal

func (m *Series) Unmarshal(dAtA []byte) error

func (*Series) XXX_DiscardUnknown

func (m *Series) XXX_DiscardUnknown()

func (*Series) XXX_Marshal

func (m *Series) XXX_Marshal(b []byte, deterministic bool) ([]byte, error)

func (*Series) XXX_Merge

func (m *Series) XXX_Merge(src proto.Message)

func (*Series) XXX_Size

func (m *Series) XXX_Size() int

func (*Series) XXX_Unmarshal

func (m *Series) XXX_Unmarshal(b []byte) error

type Shipper

type Shipper interface {
	Sync(ctx context.Context) (uploaded int, err error)
}

Shipper interface is used to have an easy way to mock it in tests.

type TSDBState

type TSDBState struct {
	// contains filtered or unexported fields
}

TSDBState holds data structures used by the TSDB storage engine

type WAL

type WAL interface {
	// Log marshalls the records and writes it into the WAL.
	Log(*WALRecord) error
	// Stop stops all the WAL operations.
	Stop()
}

WAL interface allows us to have a no-op WAL when the WAL is disabled.

type WALConfig

type WALConfig struct {
	WALEnabled         bool          `yaml:"wal_enabled"`
	CheckpointEnabled  bool          `yaml:"checkpoint_enabled"`
	Recover            bool          `yaml:"recover_from_wal"`
	Dir                string        `yaml:"wal_dir"`
	CheckpointDuration time.Duration `yaml:"checkpoint_duration"`
	FlushOnShutdown    bool          `yaml:"flush_on_shutdown_with_wal_enabled"`
	// contains filtered or unexported fields
}

WALConfig is config for the Write Ahead Log.

func (*WALConfig) RegisterFlags

func (cfg *WALConfig) RegisterFlags(f *flag.FlagSet)

RegisterFlags adds the flags required to config this to the given FlagSet

type WALRecord

type WALRecord struct {
	UserID  string
	Series  []tsdb_record.RefSeries
	Samples []tsdb_record.RefSample
}

WALRecord is a struct combining the series and samples record.

Directories

Path Synopsis

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL