Documentation
¶
Index ¶
- Variables
- func ExpectTables(ctx context.Context, client TableClient, expected []config.TableDesc) error
- func ParseChunkTimeRangeValue(rangeValue []byte, value []byte) (chunkID string, labelValue model.LabelValue, err error)
- func QueryKey(q Query) string
- type Bucket
- type BucketClient
- type Bytes
- type CacheEntry
- func (*CacheEntry) Descriptor() ([]byte, []int)
- func (this *CacheEntry) Equal(that interface{}) bool
- func (this *CacheEntry) GoString() string
- func (m *CacheEntry) Marshal() (dAtA []byte, err error)
- func (m *CacheEntry) MarshalTo(dAtA []byte) (int, error)
- func (m *CacheEntry) MarshalToSizedBuffer(dAtA []byte) (int, error)
- func (*CacheEntry) ProtoMessage()
- func (m *CacheEntry) Reset()
- func (m *CacheEntry) Size() (n int)
- func (this *CacheEntry) String() string
- func (m *CacheEntry) Unmarshal(dAtA []byte) error
- func (m *CacheEntry) XXX_DiscardUnknown()
- func (m *CacheEntry) XXX_Marshal(b []byte, deterministic bool) ([]byte, error)
- func (m *CacheEntry) XXX_Merge(src proto.Message)
- func (m *CacheEntry) XXX_Size() int
- func (m *CacheEntry) XXX_Unmarshal(b []byte) error
- type CardinalityExceededError
- type Client
- type Entry
- type EntryProcessor
- type ExtraTables
- type Query
- type QueryPagesCallback
- type ReadBatch
- func (*ReadBatch) Descriptor() ([]byte, []int)
- func (this *ReadBatch) Equal(that interface{}) bool
- func (m *ReadBatch) GetCardinality() int32
- func (m *ReadBatch) GetEntries() []CacheEntry
- func (m *ReadBatch) GetExpiry() int64
- func (m *ReadBatch) GetKey() string
- func (this *ReadBatch) GoString() string
- func (b ReadBatch) Iterator() ReadBatchIterator
- func (m *ReadBatch) Marshal() (dAtA []byte, err error)
- func (m *ReadBatch) MarshalTo(dAtA []byte) (int, error)
- func (m *ReadBatch) MarshalToSizedBuffer(dAtA []byte) (int, error)
- func (*ReadBatch) ProtoMessage()
- func (m *ReadBatch) Reset()
- func (m *ReadBatch) Size() (n int)
- func (this *ReadBatch) String() string
- func (m *ReadBatch) Unmarshal(dAtA []byte) error
- func (m *ReadBatch) XXX_DiscardUnknown()
- func (m *ReadBatch) XXX_Marshal(b []byte, deterministic bool) ([]byte, error)
- func (m *ReadBatch) XXX_Merge(src proto.Message)
- func (m *ReadBatch) XXX_Size() int
- func (m *ReadBatch) XXX_Unmarshal(b []byte) error
- type ReadBatchIterator
- type ReadBatchResult
- type ReadClient
- type Reader
- type SeriesStoreSchema
- type StoreLimits
- type TableClient
- type TableManager
- type TableManagerConfig
- type WriteBatch
- type WriteClient
Constants ¶
This section is empty.
Variables ¶
var ( ErrInvalidLengthCachingIndexClient = fmt.Errorf("proto: negative length found during unmarshaling") ErrIntOverflowCachingIndexClient = fmt.Errorf("proto: integer overflow") )
var ( // ErrNotSupported when a schema doesn't support that particular lookup. ErrNotSupported = errors.New("not supported") ErrMetricNameLabelMissing = errors.New("metric name label missing") )
Functions ¶
func ExpectTables ¶
ExpectTables compares existing tables to an expected set of tables. Exposed for testing,
func ParseChunkTimeRangeValue ¶
func ParseChunkTimeRangeValue(rangeValue []byte, value []byte) ( chunkID string, labelValue model.LabelValue, err error, )
ParseChunkTimeRangeValue returns the chunkID (seriesID since v9) and labelValue for chunk time range values.
Types ¶
type Bucket ¶
type Bucket struct {
// contains filtered or unexported fields
}
Bucket describes a range of time with a tableName and hashKey
type BucketClient ¶
BucketClient is used to enforce retention on chunk buckets.
type Bytes ¶
type Bytes []byte
Bytes exists to stop proto copying the byte array
type CacheEntry ¶
type CacheEntry struct {
Column Bytes `protobuf:"bytes,1,opt,name=Column,proto3,customtype=Bytes" json:"Column"`
Value Bytes `protobuf:"bytes,2,opt,name=Value,proto3,customtype=Bytes" json:"Value"`
}
func (*CacheEntry) Descriptor ¶
func (*CacheEntry) Descriptor() ([]byte, []int)
func (*CacheEntry) Equal ¶
func (this *CacheEntry) Equal(that interface{}) bool
func (*CacheEntry) GoString ¶
func (this *CacheEntry) GoString() string
func (*CacheEntry) Marshal ¶
func (m *CacheEntry) Marshal() (dAtA []byte, err error)
func (*CacheEntry) MarshalToSizedBuffer ¶
func (m *CacheEntry) MarshalToSizedBuffer(dAtA []byte) (int, error)
func (*CacheEntry) ProtoMessage ¶
func (*CacheEntry) ProtoMessage()
func (*CacheEntry) Reset ¶
func (m *CacheEntry) Reset()
func (*CacheEntry) Size ¶
func (m *CacheEntry) Size() (n int)
func (*CacheEntry) String ¶
func (this *CacheEntry) String() string
func (*CacheEntry) Unmarshal ¶
func (m *CacheEntry) Unmarshal(dAtA []byte) error
func (*CacheEntry) XXX_DiscardUnknown ¶
func (m *CacheEntry) XXX_DiscardUnknown()
func (*CacheEntry) XXX_Marshal ¶
func (m *CacheEntry) XXX_Marshal(b []byte, deterministic bool) ([]byte, error)
func (*CacheEntry) XXX_Merge ¶
func (m *CacheEntry) XXX_Merge(src proto.Message)
func (*CacheEntry) XXX_Size ¶
func (m *CacheEntry) XXX_Size() int
func (*CacheEntry) XXX_Unmarshal ¶
func (m *CacheEntry) XXX_Unmarshal(b []byte) error
type CardinalityExceededError ¶
CardinalityExceededError is returned when the user reads a row that is too large.
func (CardinalityExceededError) Error ¶
func (e CardinalityExceededError) Error() string
type Client ¶
type Client interface {
ReadClient
WriteClient
Stop()
}
Client is a client for the storage of the index (e.g. DynamoDB or Bigtable).
type Entry ¶
type Entry struct {
TableName string
HashValue string
// For writes, RangeValue will always be set.
RangeValue []byte
// New for v6 schema, label value is not written as part of the range key.
Value []byte
}
Entry describes an entry in the chunk index
type EntryProcessor ¶
type EntryProcessor interface {
ProcessIndexEntry(indexEntry Entry) error
// Will this user be accepted by the processor?
AcceptUser(user string) bool
// Called at the end of reading of index entries.
Flush() error
}
EntryProcessor receives index entries from a table.
type ExtraTables ¶
type ExtraTables struct {
TableClient TableClient
Tables []config.TableDesc
}
ExtraTables holds the list of tables that TableManager has to manage using a TableClient. This is useful for managing tables other than Chunk and Index tables.
type Query ¶
type Query struct {
TableName string
HashValue string
// One of RangeValuePrefix or RangeValueStart might be set:
// - If RangeValuePrefix is not nil, must read all keys with that prefix.
// - If RangeValueStart is not nil, must read all keys from there onwards.
// - If neither is set, must read all keys for that row.
// RangeValueStart should only be used for querying Chunk IDs.
// If this is going to change then please take care of func isChunksQuery in pkg/chunk/storage/caching_index_client.go which relies on it.
RangeValuePrefix []byte
RangeValueStart []byte
// Filters for querying
ValueEqual []byte
// If the result of this lookup is immutable or not (for caching).
Immutable bool
}
Query describes a query for entries
type QueryPagesCallback ¶
type QueryPagesCallback func(Query, ReadBatchResult) bool
QueryPagesCallback from an IndexQuery.
func QueryFilter ¶
func QueryFilter(callback QueryPagesCallback) QueryPagesCallback
QueryFilter wraps a callback to ensure the results are filtered correctly; useful for the cache and Bigtable backend, which only ever fetches the whole row.
type ReadBatch ¶
type ReadBatch struct {
Entries []CacheEntry `protobuf:"bytes,1,rep,name=entries,proto3" json:"entries"`
Key string `protobuf:"bytes,2,opt,name=key,proto3" json:"key,omitempty"`
// The time at which the key expires.
Expiry int64 `protobuf:"varint,3,opt,name=expiry,proto3" json:"expiry,omitempty"`
// The number of entries; used for cardinality limiting.
// entries will be empty when this is set.
Cardinality int32 `protobuf:"varint,4,opt,name=cardinality,proto3" json:"cardinality,omitempty"`
}
func (*ReadBatch) Descriptor ¶
func (*ReadBatch) GetCardinality ¶
func (*ReadBatch) GetEntries ¶
func (m *ReadBatch) GetEntries() []CacheEntry
func (ReadBatch) Iterator ¶
func (b ReadBatch) Iterator() ReadBatchIterator
Iterator implements chunk.ReadBatch.
func (*ReadBatch) MarshalToSizedBuffer ¶
func (*ReadBatch) ProtoMessage ¶
func (*ReadBatch) ProtoMessage()
func (*ReadBatch) XXX_DiscardUnknown ¶
func (m *ReadBatch) XXX_DiscardUnknown()
func (*ReadBatch) XXX_Marshal ¶
func (*ReadBatch) XXX_Unmarshal ¶
type ReadBatchIterator ¶
ReadBatchIterator is an iterator over a ReadBatch.
type ReadBatchResult ¶
type ReadBatchResult interface {
Iterator() ReadBatchIterator
}
ReadBatchResult represents the results of a QueryPages.
type ReadClient ¶
type ReadClient interface {
QueryPages(ctx context.Context, queries []Query, callback QueryPagesCallback) error
}
Client for the read path.
type Reader ¶
type Reader interface {
IndexTableNames(ctx context.Context) ([]string, error)
// Reads a single table from index, and passes individual index entries to the processors.
//
// All entries with the same TableName, HashValue and RangeValue are passed to the same processor,
// and all such entries (with different Values) are passed before index entries with different
// values of HashValue and RangeValue are passed to the same processor.
//
// This allows IndexEntryProcessor to find when values for given Hash and Range finish:
// as soon as new Hash and Range differ from last IndexEntry.
//
// Index entries passed to the same processor arrive sorted by HashValue and RangeValue.
ReadIndexEntries(ctx context.Context, table string, processors []EntryProcessor) error
}
Reader parses index entries and passes them to the IndexEntryProcessor.
type SeriesStoreSchema ¶
type SeriesStoreSchema interface {
// When doing a read, use these methods to return the list of entries you should query
GetReadQueriesForMetric(from, through model.Time, userID string, metricName string) ([]Query, error)
GetReadQueriesForMetricLabel(from, through model.Time, userID string, metricName string, labelName string) ([]Query, error)
GetReadQueriesForMetricLabelValue(from, through model.Time, userID string, metricName string, labelName string, labelValue string) ([]Query, error)
FilterReadQueries(queries []Query, shard *astmapper.ShardAnnotation) []Query
// returns cache key string and []IndexEntry per bucket, matched in order
GetCacheKeysAndLabelWriteEntries(from, through model.Time, userID string, metricName string, labels labels.Labels, chunkID string) ([]string, [][]Entry, error)
GetChunkWriteEntries(from, through model.Time, userID string, metricName string, labels labels.Labels, chunkID string) ([]Entry, error)
// If the query resulted in series IDs, use this method to find chunks.
GetChunksForSeries(from, through model.Time, userID string, seriesID []byte) ([]Query, error)
// Returns queries to retrieve all label names of multiple series by id.
GetLabelNamesForSeries(from, through model.Time, userID string, seriesID []byte) ([]Query, error)
}
SeriesStoreSchema is a schema used by seriesStore
func CreateSchema ¶
func CreateSchema(cfg config.PeriodConfig) (SeriesStoreSchema, error)
CreateSchema returns the schema defined by the PeriodConfig
func NewSchemaCaching ¶
func NewSchemaCaching(schema SeriesStoreSchema, cacheOlderThan time.Duration) SeriesStoreSchema
type StoreLimits ¶
StoreLimits helps get Limits specific to Queries for Stores
type TableClient ¶
type TableClient interface {
ListTables(ctx context.Context) ([]string, error)
CreateTable(ctx context.Context, desc config.TableDesc) error
DeleteTable(ctx context.Context, name string) error
DescribeTable(ctx context.Context, name string) (desc config.TableDesc, isActive bool, err error)
UpdateTable(ctx context.Context, current, expected config.TableDesc) error
Stop()
}
TableClient is a client for telling Dynamo what to do with tables.
type TableManager ¶
TableManager creates and manages the provisioned throughput on DynamoDB tables
func NewTableManager ¶
func NewTableManager(cfg TableManagerConfig, schemaCfg config.SchemaConfig, maxChunkAge time.Duration, tableClient TableClient, objectClient BucketClient, extraTables []ExtraTables, registerer prometheus.Registerer, logger log.Logger, ) (*TableManager, error)
NewTableManager makes a new TableManager
func (*TableManager) SyncTables ¶
func (m *TableManager) SyncTables(ctx context.Context) error
SyncTables will calculate the tables expected to exist, create those that do not and update those that need it. It is exposed for testing.
type TableManagerConfig ¶
type TableManagerConfig struct {
// Master 'off-switch' for table capacity updates, e.g. when troubleshooting
ThroughputUpdatesDisabled bool `yaml:"throughput_updates_disabled"`
// Master 'on-switch' for table retention deletions
RetentionDeletesEnabled bool `yaml:"retention_deletes_enabled"`
// How far back tables will be kept before they are deleted
RetentionPeriod time.Duration `yaml:"-"`
// This is so that we can accept 1w, 1y in the YAML.
RetentionPeriodModel model.Duration `yaml:"retention_period"`
// Period with which the table manager will poll for tables.
PollInterval time.Duration `yaml:"poll_interval"`
// duration a table will be created before it is needed.
CreationGracePeriod time.Duration `yaml:"creation_grace_period"`
IndexTables config.ProvisionConfig `yaml:"index_tables_provisioning"`
ChunkTables config.ProvisionConfig `yaml:"chunk_tables_provisioning"`
}
TableManagerConfig holds config for a TableManager
func (*TableManagerConfig) MarshalYAML ¶
func (cfg *TableManagerConfig) MarshalYAML() (interface{}, error)
MarshalYAML implements the yaml.Marshaler interface. To support RetentionPeriod.
func (*TableManagerConfig) RegisterFlags ¶
func (cfg *TableManagerConfig) RegisterFlags(f *flag.FlagSet)
RegisterFlags adds the flags required to config this to the given FlagSet.
func (*TableManagerConfig) UnmarshalYAML ¶
func (cfg *TableManagerConfig) UnmarshalYAML(unmarshal func(interface{}) error) error
UnmarshalYAML implements the yaml.Unmarshaler interface. To support RetentionPeriod.
func (*TableManagerConfig) Validate ¶
func (cfg *TableManagerConfig) Validate() error
Validate validates the config.
type WriteBatch ¶
type WriteBatch interface {
Add(tableName, hashValue string, rangeValue []byte, value []byte)
Delete(tableName, hashValue string, rangeValue []byte)
}
WriteBatch represents a batch of writes.
type WriteClient ¶
type WriteClient interface {
NewWriteBatch() WriteBatch
BatchWrite(context.Context, WriteBatch) error
}
Client for the write path.