chunk

package
v0.0.0-...-b14eccf Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jun 27, 2017 License: Apache-2.0 Imports: 44 Imported by: 0

Documentation

Index

Constants

View Source
const (
	ErrInvalidChunkID  = errors.Error("invalid chunk ID")
	ErrInvalidChecksum = errors.Error("invalid chunk checksum")
	ErrWrongMetadata   = errors.Error("wrong chunk metadata")
)

Errors that decode can return

Variables

View Source
var (
	ErrNoMetricNameNotSupported = errors.New("metric name required for pre-v7 schemas")
)

Errors

Functions

func ChunksToMatrix

func ChunksToMatrix(chunks []Chunk) (model.Matrix, error)

ChunksToMatrix converts a slice of chunks into a model.Matrix.

Types

type AWSStorageConfig

type AWSStorageConfig struct {
	DynamoDBConfig
	PeriodicChunkTableConfig

	S3 util.URLValue
}

AWSStorageConfig specifies config for storing data on AWS.

func (*AWSStorageConfig) RegisterFlags

func (cfg *AWSStorageConfig) RegisterFlags(f *flag.FlagSet)

RegisterFlags adds the flags required to config this to the given FlagSet

type Bucket

type Bucket struct {
	// contains filtered or unexported fields
}

Bucket describes a range of time with a tableName and hashKey

type ByKey

type ByKey []Chunk

ByKey allow you to sort chunks by ID

func (ByKey) Len

func (cs ByKey) Len() int

func (ByKey) Less

func (cs ByKey) Less(i, j int) bool

func (ByKey) Swap

func (cs ByKey) Swap(i, j int)

type Cache

type Cache struct {
	// contains filtered or unexported fields
}

Cache type caches chunks

func NewCache

func NewCache(cfg CacheConfig) *Cache

NewCache makes a new Cache

func (*Cache) BackgroundWrite

func (c *Cache) BackgroundWrite(key string, buf []byte)

BackgroundWrite writes chunks for the cache in the background

func (*Cache) FetchChunkData

func (c *Cache) FetchChunkData(ctx context.Context, chunks []Chunk) (found []Chunk, missing []Chunk, err error)

FetchChunkData gets chunks from the chunk cache.

func (*Cache) Stop

func (c *Cache) Stop()

Stop the background flushing goroutines.

func (*Cache) StoreChunk

func (c *Cache) StoreChunk(ctx context.Context, key string, buf []byte) error

StoreChunk serializes and stores a chunk in the chunk cache.

type CacheConfig

type CacheConfig struct {
	Expiration          time.Duration
	WriteBackGoroutines int
	WriteBackBuffer     int
	// contains filtered or unexported fields
}

CacheConfig is config to make a Cache

func (*CacheConfig) RegisterFlags

func (cfg *CacheConfig) RegisterFlags(f *flag.FlagSet)

RegisterFlags adds the flags required to config this to the given FlagSet

type Chunk

type Chunk struct {
	// These two fields will be missing from older chunks (as will the hash).
	// On fetch we will initialise these fields from the DynamoDB key.
	Fingerprint model.Fingerprint `json:"fingerprint"`
	UserID      string            `json:"userID"`

	// These fields will be in all chunks, including old ones.
	From    model.Time   `json:"from"`
	Through model.Time   `json:"through"`
	Metric  model.Metric `json:"metric"`

	// The hash is not written to the external storage either.  We use
	// crc32, Castagnoli table.  See http://www.evanjones.ca/crc32c.html.
	// For old chunks, ChecksumSet will be false.
	ChecksumSet bool   `json:"-"`
	Checksum    uint32 `json:"-"`

	// We never use Delta encoding (the zero value), so if this entry is
	// missing, we default to DoubleDelta.
	Encoding prom_chunk.Encoding `json:"encoding"`
	Data     prom_chunk.Chunk    `json:"-"`
	// contains filtered or unexported fields
}

Chunk contains encoded timeseries data

func NewChunk

func NewChunk(userID string, fp model.Fingerprint, metric model.Metric, c prom_chunk.Chunk, from, through model.Time) Chunk

NewChunk creates a new chunk

type DynamoDBConfig

type DynamoDBConfig struct {
	DynamoDB util.URLValue
	APILimit float64
}

DynamoDBConfig specifies config for a DynamoDB database.

func (*DynamoDBConfig) RegisterFlags

func (cfg *DynamoDBConfig) RegisterFlags(f *flag.FlagSet)

RegisterFlags adds the flags required to config this to the given FlagSet

type IndexEntry

type IndexEntry struct {
	TableName string
	HashValue string

	// For writes, RangeValue will always be set.
	RangeValue []byte

	// New for v6 schema, label value is not written as part of the range key.
	Value []byte
}

IndexEntry describes an entry in the chunk index

type IndexQuery

type IndexQuery struct {
	TableName string
	HashValue string

	// One of RangeValuePrefix or RangeValueStart might be set:
	// - If RangeValuePrefix is not nil, must read all keys with that prefix.
	// - If RangeValueStart is not nil, must read all keys from there onwards.
	// - If neither is set, must read all keys for that row.
	RangeValuePrefix []byte
	RangeValueStart  []byte

	// Filters for querying
	ValueEqual []byte
}

IndexQuery describes a query for entries

type Memcache

type Memcache interface {
	GetMulti(keys []string) (map[string]*memcache.Item, error)
	Set(item *memcache.Item) error
}

Memcache caches things

type MemcacheClient

type MemcacheClient struct {
	*memcache.Client
	// contains filtered or unexported fields
}

MemcacheClient is a memcache client that gets its server list from SRV records, and periodically updates that ServerList.

func NewMemcacheClient

func NewMemcacheClient(cfg MemcacheConfig) *MemcacheClient

NewMemcacheClient creates a new MemcacheClient that gets its server list from SRV and updates the server list on a regular basis.

func (*MemcacheClient) Stop

func (c *MemcacheClient) Stop()

Stop the memcache client.

type MemcacheConfig

type MemcacheConfig struct {
	Host           string
	Service        string
	Timeout        time.Duration
	UpdateInterval time.Duration
}

MemcacheConfig defines how a MemcacheClient should be constructed.

func (*MemcacheConfig) RegisterFlags

func (cfg *MemcacheConfig) RegisterFlags(f *flag.FlagSet)

RegisterFlags adds the flags required to config this to the given FlagSet

type MockStorage

type MockStorage struct {
	// contains filtered or unexported fields
}

MockStorage is a fake in-memory StorageClient.

func NewMockStorage

func NewMockStorage() *MockStorage

NewMockStorage creates a new MockStorage.

func (*MockStorage) BatchWrite

func (m *MockStorage) BatchWrite(ctx context.Context, batch WriteBatch) error

BatchWrite implements StorageClient.

func (*MockStorage) CreateTable

func (m *MockStorage) CreateTable(_ context.Context, desc TableDesc) error

CreateTable implements StorageClient.

func (*MockStorage) DescribeTable

func (m *MockStorage) DescribeTable(_ context.Context, name string) (desc TableDesc, status string, err error)

DescribeTable implements StorageClient.

func (*MockStorage) GetChunks

func (m *MockStorage) GetChunks(ctx context.Context, chunkSet []Chunk) ([]Chunk, error)

GetChunks implements StorageClient.

func (*MockStorage) ListTables

func (m *MockStorage) ListTables(_ context.Context) ([]string, error)

ListTables implements StorageClient.

func (*MockStorage) NewWriteBatch

func (m *MockStorage) NewWriteBatch() WriteBatch

NewWriteBatch implements StorageClient.

func (*MockStorage) PutChunks

func (m *MockStorage) PutChunks(_ context.Context, chunks []Chunk) error

PutChunks implements StorageClient.

func (*MockStorage) QueryPages

func (m *MockStorage) QueryPages(ctx context.Context, query IndexQuery, callback func(result ReadBatch, lastPage bool) (shouldContinue bool)) error

QueryPages implements StorageClient.

func (*MockStorage) UpdateTable

func (m *MockStorage) UpdateTable(_ context.Context, _, desc TableDesc) error

UpdateTable implements StorageClient.

type PeriodicChunkTableConfig

type PeriodicChunkTableConfig struct {
	ChunkTableFrom   util.DayValue
	ChunkTablePrefix string
	ChunkTablePeriod time.Duration
}

PeriodicChunkTableConfig contains the various parameters for the chunk table.

func (*PeriodicChunkTableConfig) RegisterFlags

func (cfg *PeriodicChunkTableConfig) RegisterFlags(f *flag.FlagSet)

RegisterFlags adds the flags required to config this to the given FlagSet

type PeriodicTableConfig

type PeriodicTableConfig struct {
	OriginalTableName    string
	UsePeriodicTables    bool
	TablePrefix          string
	TablePeriod          time.Duration
	PeriodicTableStartAt util.DayValue
}

PeriodicTableConfig for the use of periodic tables (ie, weekly tables). Can control when to start the periodic tables, how long the period should be, and the prefix to give the tables.

func (*PeriodicTableConfig) RegisterFlags

func (cfg *PeriodicTableConfig) RegisterFlags(f *flag.FlagSet)

RegisterFlags adds the flags required to config this to the given FlagSet

type ReadBatch

type ReadBatch interface {
	Len() int
	RangeValue(index int) []byte
	Value(index int) []byte
}

ReadBatch represents the results of a QueryPages.

type Schema

type Schema interface {
	// When doing a write, use this method to return the list of entries you should write to.
	GetWriteEntries(from, through model.Time, userID string, metricName model.LabelValue, labels model.Metric, chunkID string) ([]IndexEntry, error)

	// When doing a read, use these methods to return the list of entries you should query
	GetReadQueries(from, through model.Time, userID string) ([]IndexQuery, error)
	GetReadQueriesForMetric(from, through model.Time, userID string, metricName model.LabelValue) ([]IndexQuery, error)
	GetReadQueriesForMetricLabel(from, through model.Time, userID string, metricName model.LabelValue, labelName model.LabelName) ([]IndexQuery, error)
	GetReadQueriesForMetricLabelValue(from, through model.Time, userID string, metricName model.LabelValue, labelName model.LabelName, labelValue model.LabelValue) ([]IndexQuery, error)
}

Schema interface defines methods to calculate the hash and range keys needed to write or read chunks from the external index.

type SchemaConfig

type SchemaConfig struct {
	PeriodicTableConfig

	// After midnight on this day, we start bucketing indexes by day instead of by
	// hour.  Only the day matters, not the time within the day.
	DailyBucketsFrom util.DayValue

	// After this time, we will only query for base64-encoded label values.
	Base64ValuesFrom util.DayValue

	// After this time, we will read and write v4 schemas.
	V4SchemaFrom util.DayValue

	// After this time, we will read and write v5 schemas.
	V5SchemaFrom util.DayValue

	// After this time, we will read and write v6 schemas.
	V6SchemaFrom util.DayValue

	// After this time, we will read and write v7 schemas.
	V7SchemaFrom util.DayValue
}

SchemaConfig contains the config for our chunk index schemas

func (*SchemaConfig) RegisterFlags

func (cfg *SchemaConfig) RegisterFlags(f *flag.FlagSet)

RegisterFlags adds the flags required to config this to the given FlagSet

type StorageClient

type StorageClient interface {
	// For the write path.
	NewWriteBatch() WriteBatch
	BatchWrite(context.Context, WriteBatch) error

	// For the read path.
	QueryPages(ctx context.Context, query IndexQuery, callback func(result ReadBatch, lastPage bool) (shouldContinue bool)) error

	// For storing and retrieving chunks.
	PutChunks(ctx context.Context, chunks []Chunk) error
	GetChunks(ctx context.Context, chunks []Chunk) ([]Chunk, error)
}

StorageClient is a client for the persistent storage for Cortex. (e.g. DynamoDB + S3).

func NewAWSStorageClient

func NewAWSStorageClient(cfg AWSStorageConfig) (StorageClient, error)

NewAWSStorageClient makes a new AWS-backed StorageClient.

func NewStorageClient

func NewStorageClient(cfg StorageClientConfig) (StorageClient, error)

NewStorageClient makes a storage client based on the configuration.

type StorageClientConfig

type StorageClientConfig struct {
	StorageClient string
	AWSStorageConfig
}

StorageClientConfig chooses which storage client to use.

func (*StorageClientConfig) RegisterFlags

func (cfg *StorageClientConfig) RegisterFlags(f *flag.FlagSet)

RegisterFlags adds the flags required to configure this flag set.

type Store

type Store struct {
	// contains filtered or unexported fields
}

Store implements Store

func NewStore

func NewStore(cfg StoreConfig, storage StorageClient) (*Store, error)

NewStore makes a new ChunkStore

func (*Store) Get

func (c *Store) Get(ctx context.Context, from, through model.Time, allMatchers ...*metric.LabelMatcher) ([]Chunk, error)

Get implements ChunkStore

func (*Store) Put

func (c *Store) Put(ctx context.Context, chunks []Chunk) error

Put implements ChunkStore

func (*Store) Stop

func (c *Store) Stop()

Stop any background goroutines (ie in the cache.)

type StoreConfig

type StoreConfig struct {
	SchemaConfig
	CacheConfig
	// contains filtered or unexported fields
}

StoreConfig specifies config for a ChunkStore

func (*StoreConfig) RegisterFlags

func (cfg *StoreConfig) RegisterFlags(f *flag.FlagSet)

RegisterFlags adds the flags required to config this to the given FlagSet

type TableClient

type TableClient interface {
	ListTables(ctx context.Context) ([]string, error)
	CreateTable(ctx context.Context, desc TableDesc) error
	DescribeTable(ctx context.Context, name string) (desc TableDesc, status string, err error)
	UpdateTable(ctx context.Context, current, expected TableDesc) error
}

TableClient is a client for telling Dynamo what to do with tables.

func NewDynamoDBTableClient

func NewDynamoDBTableClient(cfg DynamoDBConfig) (TableClient, error)

NewDynamoDBTableClient makes a new DynamoTableClient.

type TableDesc

type TableDesc struct {
	Name             string
	ProvisionedRead  int64
	ProvisionedWrite int64
	Tags             Tags
}

TableDesc describes a table.

func (TableDesc) Equals

func (desc TableDesc) Equals(other TableDesc) bool

Equals returns true if other matches desc.

type TableManager

type TableManager struct {
	// contains filtered or unexported fields
}

TableManager creates and manages the provisioned throughput on DynamoDB tables

func NewTableManager

func NewTableManager(cfg TableManagerConfig, tableClient TableClient) (*TableManager, error)

NewTableManager makes a new TableManager

func (*TableManager) Start

func (m *TableManager) Start()

Start the TableManager

func (*TableManager) Stop

func (m *TableManager) Stop()

Stop the TableManager

type TableManagerConfig

type TableManagerConfig struct {
	DynamoDBPollInterval time.Duration

	PeriodicTableConfig
	PeriodicChunkTableConfig

	// duration a table will be created before it is needed.
	CreationGracePeriod        time.Duration
	MaxChunkAge                time.Duration
	ProvisionedWriteThroughput int64
	ProvisionedReadThroughput  int64
	InactiveWriteThroughput    int64
	InactiveReadThroughput     int64

	ChunkTableProvisionedWriteThroughput int64
	ChunkTableProvisionedReadThroughput  int64
	ChunkTableInactiveWriteThroughput    int64
	ChunkTableInactiveReadThroughput     int64

	TableTags Tags
}

TableManagerConfig is the config for a TableManager

func (*TableManagerConfig) RegisterFlags

func (cfg *TableManagerConfig) RegisterFlags(f *flag.FlagSet)

RegisterFlags adds the flags required to config this to the given FlagSet

type Tags

type Tags map[string]string

Tags is a string-string map that implements flag.Value.

func (Tags) AWSTags

func (ts Tags) AWSTags() []*dynamodb.Tag

AWSTags converts ts into a []*dynamodb.Tag.

func (Tags) Equals

func (ts Tags) Equals(other Tags) bool

Equals returns true is other matches ts.

func (*Tags) Set

func (ts *Tags) Set(s string) error

Set implements flag.Value

func (Tags) String

func (ts Tags) String() string

String implements flag.Value

type WriteBatch

type WriteBatch interface {
	Add(tableName, hashValue string, rangeValue []byte, value []byte)
}

WriteBatch represents a batch of writes.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL