v0.2.1 Latest Latest

This package is not in the latest version of its module.

Go to latest
Published: Nov 16, 2015 License: MIT, MIT Imports: 25 Imported by: 0




View Source
const (
	// MagicNumber is written as the first 4 bytes of a data file to
	// identify the file as a tsm1 formatted file
	MagicNumber uint32 = 0x16D116D1

	Version byte = 1
View Source
const (
	// BlockFloat64 designates a block encodes float64 values
	BlockFloat64 = 0

	// BlockInt64 designates a block encodes int64 values
	BlockInt64 = 1

	// BlockBool designates a block encodes bool values
	BlockBool = 2

	// BlockString designates a block encodes string values
	BlockString = 3
View Source
const (
	// Format is the file format name of this engine.
	Format = "tsm1"

	//IDsFileExtension is the extension for the file that keeps the compressed map
	// of keys to uint64 IDs.
	IDsFileExtension = "ids"

	// FieldsFileExtension is the extension for the file that stores compressed field
	// encoding data for this db
	FieldsFileExtension = "fields"

	// SeriesFileExtension is the extension for the file that stores the compressed
	// series metadata for series in this db
	SeriesFileExtension = "series"

	// CollisionsFileExtension is the extension for the file that keeps a map of which
	// keys have hash collisions and what their actual IDs are
	CollisionsFileExtension = "collisions"

	// CheckpointExtension is the extension given to files that checkpoint a rewrite or compaction.
	// The checkpoint files are created when a new file is first created. They
	// are removed after the file has been synced and is safe for use. If a file
	// has an associated checkpoint file, it wasn't safely written and both should be removed
	CheckpointExtension = "check"

	// CompactionExtension is the extension given to the file that marks when a compaction has been
	// fully written, but the compacted files have not yet been deleted. It is used for cleanup
	// if the server was not cleanly shutdown before the compacted files could be deleted.
	CompactionExtension = "compact"
View Source
const (
	MaxDataFileSize = 1024 * 1024 * 1024 * 2 // 2GB

	DefaultRotateFileSize = 5 * 1024 * 1024 // 5MB

	DefaultMaxPointsPerBlock = 1000

	// MAP_POPULATE is for the mmap syscall. For some reason this isn't defined in golang's syscall
	MAP_POPULATE = 0x8000
View Source
const (
	// DefaultSegmentSize of 2MB is the size at which segment files will be rolled over
	DefaultSegmentSize = 2 * 1024 * 1024

	// FileExtension is the file extension we expect for wal segments
	WALFileExtension = "wal"

	WALFilePrefix = "_"


View Source
var ErrWALClosed = fmt.Errorf("WAL closed")


func DecodeBlock

func DecodeBlock(block []byte, vals *[]Value) error

DecodeBlock takes a byte array and will decode into values of the appropriate type based on the block

func NewCombinedEngineCursor

func NewCombinedEngineCursor(wc, ec tsdb.Cursor, ascending bool) tsdb.Cursor

NewCombinedEngineCursor returns a Cursor that joins wc and ec. Values from wc take precedence over ec when identical timestamps are returned.

func NewDataFile

func NewDataFile(f *os.File) (*dataFile, error)

func NewEngine

func NewEngine(path string, walPath string, opt tsdb.EngineOptions) tsdb.Engine

NewEngine returns a new instance of Engine.

func NewMultiFieldCursor

func NewMultiFieldCursor(fields []string, cursors []tsdb.Cursor, ascending bool) tsdb.Cursor

NewMultiFieldCursor returns an instance of Cursor that joins the results of cursors.

func NewTSMReader added in v0.2.1

func NewTSMReader(r io.ReadSeeker) (*tsmReader, error)

func SeriesFieldKey

func SeriesFieldKey(seriesKey, field string) string

SeriesFieldKey combine a series key and field name for a unique string to be hashed to a numeric ID

func ZigZagDecode

func ZigZagDecode(v uint64) int64

ZigZagDecode converts a previously zigzag encoded uint64 back to a int64

func ZigZagEncode

func ZigZagEncode(x int64) uint64

ZigZagEncode converts a int64 to a uint64 by zig zagging negative and positive values across even and odd numbers. Eg. [0,-1,1,-2] becomes [0, 1, 2, 3]


type BoolDecoder

type BoolDecoder interface {
	Next() bool
	Read() bool
	Error() error

BoolDecoder decodes a series of bools from an in-memory buffer.

func NewBoolDecoder

func NewBoolDecoder(b []byte) BoolDecoder

NewBoolDecoder returns a new instance of BoolDecoder.

type BoolEncoder

type BoolEncoder interface {
	Write(b bool)
	Bytes() ([]byte, error)

BoolEncoder encodes a series of bools to an in-memory buffer.

func NewBoolEncoder

func NewBoolEncoder() BoolEncoder

NewBoolEncoder returns a new instance of BoolEncoder.

type BoolValue

type BoolValue struct {
	// contains filtered or unexported fields

func (*BoolValue) Size

func (b *BoolValue) Size() int

func (*BoolValue) Time

func (b *BoolValue) Time() time.Time

func (*BoolValue) UnixNano

func (b *BoolValue) UnixNano() int64

func (*BoolValue) Value

func (b *BoolValue) Value() interface{}

type EmptyValue

type EmptyValue struct {

func (*EmptyValue) Size

func (e *EmptyValue) Size() int

func (*EmptyValue) Time

func (e *EmptyValue) Time() time.Time

func (*EmptyValue) UnixNano

func (e *EmptyValue) UnixNano() int64

func (*EmptyValue) Value

func (e *EmptyValue) Value() interface{}

type Engine

type Engine struct {

	// HashSeriesField is a function that takes a series key and a field name
	// and returns a hash identifier. It's not guaranteed to be unique.
	HashSeriesField func(key string) uint64

	WAL *Log

	RotateFileSize             uint32
	MaxFileSize                uint32
	SkipCompaction             bool
	CompactionAge              time.Duration
	MinCompactionFileCount     int
	IndexCompactionFullAge     time.Duration
	IndexMinCompactionInterval time.Duration
	MaxPointsPerBlock          int
	// contains filtered or unexported fields

Engine represents a storage engine with compressed blocks.

func (*Engine) Begin

func (e *Engine) Begin(writable bool) (tsdb.Tx, error)

Begin starts a new transaction on the engine.

func (*Engine) Close

func (e *Engine) Close() error

Close closes the engine.

func (*Engine) Compact

func (e *Engine) Compact(fullCompaction bool) error

Compact will compact data files in the directory into the fewest possible data files they can be combined into

func (*Engine) DataFileCount

func (e *Engine) DataFileCount() int

DataFileCount returns the number of data files in the database

func (*Engine) DecodeAndCombine

func (e *Engine) DecodeAndCombine(newValues Values, block, buf []byte, nextTime int64, hasFutureBlock bool) (Values, []byte, error)

DecodeAndCombine take an encoded block from a file, decodes it and interleaves the file values with the values passed in. nextTime and hasNext refer to if the file has future encoded blocks so that this method can know how much of its values can be combined and output in the resulting encoded block.

func (*Engine) DeleteMeasurement

func (e *Engine) DeleteMeasurement(name string, seriesKeys []string) error

DeleteMeasurement deletes a measurement and all related series.

func (*Engine) DeleteSeries

func (e *Engine) DeleteSeries(seriesKeys []string) error

DeleteSeries deletes the series from the engine.

func (*Engine) Format

func (e *Engine) Format() tsdb.EngineFormat

Format returns the format type of this engine

func (*Engine) LoadMetadataIndex

func (e *Engine) LoadMetadataIndex(shard *tsdb.Shard, index *tsdb.DatabaseIndex, measurementFields map[string]*tsdb.MeasurementFields) error

LoadMetadataIndex loads the shard metadata into memory.

func (*Engine) MarkDeletes

func (e *Engine) MarkDeletes(keys []string)

MarkDeletes will mark the given keys for deletion in memory. They will be deleted from data files on the next flush. This mainly for the WAL to use on startup

func (*Engine) MarkMeasurementDelete

func (e *Engine) MarkMeasurementDelete(name string)

func (*Engine) Open

func (e *Engine) Open() error

Open opens and initializes the engine.

func (*Engine) Path

func (e *Engine) Path() string

Path returns the path the engine was opened with.

func (*Engine) PerformMaintenance

func (e *Engine) PerformMaintenance()

PerformMaintenance is for periodic maintenance of the store. A no-op for b1

func (*Engine) SeriesCount

func (e *Engine) SeriesCount() (n int, err error)

SeriesCount returns the number of series buckets on the shard.

func (*Engine) SetLogOutput

func (e *Engine) SetLogOutput(w io.Writer)

SetLogOutput is a no-op.

func (*Engine) Write

func (e *Engine) Write(pointsByKey map[string]Values, measurementFieldsToSave map[string]*tsdb.MeasurementFields, seriesToCreate []*tsdb.SeriesCreate) error

func (*Engine) WritePoints

func (e *Engine) WritePoints(points []models.Point, measurementFieldsToSave map[string]*tsdb.MeasurementFields, seriesToCreate []*tsdb.SeriesCreate) error

WritePoints writes metadata and point data into the engine. Returns an error if new points are added to an existing key.

func (*Engine) WriteTo

func (e *Engine) WriteTo(w io.Writer) (n int64, err error)

type FloatDecoder

type FloatDecoder struct {
	// contains filtered or unexported fields

FloatDecoder decodes a byte slice into multipe float64 values

func NewFloatDecoder

func NewFloatDecoder(b []byte) (*FloatDecoder, error)

func (*FloatDecoder) Error

func (it *FloatDecoder) Error() error

func (*FloatDecoder) Next

func (it *FloatDecoder) Next() bool

func (*FloatDecoder) Values

func (it *FloatDecoder) Values() float64

type FloatEncoder

type FloatEncoder struct {
	// contains filtered or unexported fields

FloatEncoder encodes multiple float64s into a byte slice

func NewFloatEncoder

func NewFloatEncoder() *FloatEncoder

func (*FloatEncoder) Bytes

func (s *FloatEncoder) Bytes() ([]byte, error)

func (*FloatEncoder) Finish

func (s *FloatEncoder) Finish()

func (*FloatEncoder) Push

func (s *FloatEncoder) Push(v float64)

type FloatValue

type FloatValue struct {
	// contains filtered or unexported fields

func (*FloatValue) Size

func (f *FloatValue) Size() int

func (*FloatValue) Time

func (f *FloatValue) Time() time.Time

func (*FloatValue) UnixNano

func (f *FloatValue) UnixNano() int64

func (*FloatValue) Value

func (f *FloatValue) Value() interface{}

type IndexEntry added in v0.2.1

type IndexEntry struct {

	// The min and max time of all points stored in the block.
	MinTime, MaxTime time.Time

	// The absolute position in the file where this block is located.
	Offset int64

	// The size in bytes of the block in the file.
	Size uint32

IndexEntry is the index information for a given block in a TSM file.

func (*IndexEntry) Contains added in v0.2.1

func (e *IndexEntry) Contains(t time.Time) bool

Returns true if this IndexEntry may contain values for the given time. The min and max times are inclusive.

func (*IndexEntry) UnmarshalBinary added in v0.2.1

func (e *IndexEntry) UnmarshalBinary(b []byte) error

type IndexWriter

type IndexWriter interface {
	Write(valuesByKey map[string]Values, measurementFieldsToSave map[string]*tsdb.MeasurementFields, seriesToCreate []*tsdb.SeriesCreate) error
	MarkDeletes(keys []string)
	MarkMeasurementDelete(name string)

IndexWriter is an interface for the indexed database the WAL flushes data to

type Int64Decoder

type Int64Decoder interface {
	Next() bool
	Read() int64
	Error() error

Int64Decoder decodes a byte slice into int64s

func NewInt64Decoder

func NewInt64Decoder(b []byte) Int64Decoder

type Int64Encoder

type Int64Encoder interface {
	Write(v int64)
	Bytes() ([]byte, error)

Int64Encoder encoders int64 into byte slices

func NewInt64Encoder

func NewInt64Encoder() Int64Encoder

type Int64Value

type Int64Value struct {
	// contains filtered or unexported fields

func (*Int64Value) Size

func (v *Int64Value) Size() int

func (*Int64Value) String

func (v *Int64Value) String() string

func (*Int64Value) Time

func (v *Int64Value) Time() time.Time

func (*Int64Value) UnixNano

func (v *Int64Value) UnixNano() int64

func (*Int64Value) Value

func (v *Int64Value) Value() interface{}

type Log

type Log struct {

	// LogOutput is the writer used by the logger.
	LogOutput io.Writer

	// FlushColdInterval is the period of time after which a partition will do a
	// full flush and compaction if it has been cold for writes.
	FlushColdInterval time.Duration

	// SegmentSize is the file size at which a segment file will be rotated
	SegmentSize int

	// FlushMemorySizeThreshold specifies when the log should be forced to be flushed
	FlushMemorySizeThreshold int

	// MaxMemorySizeThreshold specifies the limit at which writes to the WAL should be rejected
	MaxMemorySizeThreshold int

	// IndexWriter is the database series will be flushed to
	IndexWriter IndexWriter

	// LoggingEnabled specifies if detailed logs should be output
	LoggingEnabled bool

	// SkipCache specifies if the wal should immediately write to the index instead of
	// caching data in memory. False by default so we buffer in memory before flushing to index.
	SkipCache bool

	// SkipDurability specifies if the wal should not write the wal entries to disk.
	// False by default which means all writes are durable even when cached before flushing to index.
	SkipDurability bool
	// contains filtered or unexported fields

func NewLog

func NewLog(path string) *Log

func (*Log) Close

func (l *Log) Close() error

Close will finish any flush that is currently in process and close file handles

func (*Log) Cursor

func (l *Log) Cursor(series string, fields []string, dec *tsdb.FieldCodec, ascending bool) tsdb.Cursor

Cursor will return a cursor object to Seek and iterate with Next for the WAL cache for the given. This should only ever be called by the engine cursor method, which will always give it exactly one field.

func (*Log) DeleteMeasurement

func (l *Log) DeleteMeasurement(measurement string, keys []string) error

func (*Log) DeleteSeries

func (l *Log) DeleteSeries(keys []string) error

func (*Log) Flush

func (l *Log) Flush() error

Flush will force a flush of the WAL to the index

func (*Log) LastWriteTime

func (l *Log) LastWriteTime() time.Time

func (*Log) Open

func (l *Log) Open() error

Open opens and initializes the Log. Will recover from previous unclosed shutdowns

func (*Log) Path

func (l *Log) Path() string

Path returns the path the log was initialized with.

func (*Log) WritePoints

func (l *Log) WritePoints(points []models.Point, fields map[string]*tsdb.MeasurementFields, series []*tsdb.SeriesCreate) error

type StringDecoder

type StringDecoder interface {
	Next() bool
	Read() string
	Error() error

func NewStringDecoder

func NewStringDecoder(b []byte) (StringDecoder, error)

type StringEncoder

type StringEncoder interface {
	Write(s string)
	Bytes() ([]byte, error)

func NewStringEncoder

func NewStringEncoder() StringEncoder

type StringValue

type StringValue struct {
	// contains filtered or unexported fields

func (*StringValue) Size

func (v *StringValue) Size() int

func (*StringValue) String

func (v *StringValue) String() string

func (*StringValue) Time

func (v *StringValue) Time() time.Time

func (*StringValue) UnixNano

func (v *StringValue) UnixNano() int64

func (*StringValue) Value

func (v *StringValue) Value() interface{}

type TSMIndex added in v0.2.1

type TSMIndex interface {

	// Add records a new block entry for a key in the index.
	Add(key string, minTime, maxTime time.Time, offset int64, size uint32)

	// Entries returns all index entries for a key.
	Entries(key string) []*IndexEntry

	// Entry returns the index entry for the specified key and timestamp.  If no entry
	// matches the key and timestamp, nil is returned.
	Entry(key string, timestamp time.Time) *IndexEntry

	// MarshalBinary returns a byte slice encoded version of the index.
	MarshalBinary() ([]byte, error)

	// UnmarshalBinary populates an index from an encoded byte slice
	// representation of an index.
	UnmarshalBinary(b []byte) error

TSMIndex represent the index section of a TSM file. The index records all blocks, their locations, sizes, min and max times.

func NewDirectIndex added in v0.2.1

func NewDirectIndex() TSMIndex

func NewIndirectIndex added in v0.2.1

func NewIndirectIndex() TSMIndex

type TSMWriter added in v0.2.1

type TSMWriter interface {
	// Write writes a new block for key containing and values.  Writes append
	// blocks in the order that the Write function is called.  The caller is
	// responsible for ensuring keys and blocks or sorted appropriately.
	// Values are encoded as a full block.  The caller is responsible for
	// ensuring a fixed number of values are encoded in each block as wells as
	// ensuring the Values are sorted. The first and last timestamp values are
	// used as the minimum and maximum values for the index entry.
	Write(key string, values Values) error

	// Close finishes the TSM write streams and writes the index.
	Close() error

TSMWriter writes TSM formatted key and values.

func NewTSMWriter added in v0.2.1

func NewTSMWriter(w io.Writer) (TSMWriter, error)

type TimeDecoder

type TimeDecoder interface {
	Next() bool
	Read() time.Time
	Error() error

TimeDecoder decodes byte slices to time.Time values.

func NewTimeDecoder

func NewTimeDecoder(b []byte) TimeDecoder

type TimeEncoder

type TimeEncoder interface {
	Write(t time.Time)
	Bytes() ([]byte, error)

TimeEncoder encodes time.Time to byte slices.

func NewTimeEncoder

func NewTimeEncoder() TimeEncoder

NewTimeEncoder returns a TimeEncoder

type TimePrecision

type TimePrecision uint8
const (
	Seconds TimePrecision = iota

type Value

type Value interface {
	Time() time.Time
	UnixNano() int64
	Value() interface{}
	Size() int

func NewValue

func NewValue(t time.Time, value interface{}) Value

type Values

type Values []Value

Values represented a time ascending sorted collection of Value types. the underlying type should be the same across all values, but the interface makes the code cleaner.

func (Values) Deduplicate

func (a Values) Deduplicate() Values

Deduplicate returns a new Values slice with any values that have the same timestamp removed. The Value that appears last in the slice is the one that is kept. The returned slice is in ascending order

func (Values) Encode

func (a Values) Encode(buf []byte) ([]byte, error)

Encode converts the values to a byte slice. If there are no values, this function panics.

func (Values) Len

func (a Values) Len() int

Sort methods

func (Values) Less

func (a Values) Less(i, j int) bool

func (Values) MaxTime

func (a Values) MaxTime() int64

func (Values) MinTime

func (a Values) MinTime() int64

func (Values) Swap

func (a Values) Swap(i, j int)

type WriteLock

type WriteLock struct {
	// contains filtered or unexported fields

writeLock is a lock that enables locking of ranges between a min and max value. We use this so that flushes from the WAL can occur concurrently along with compactions.

func (*WriteLock) LockRange

func (w *WriteLock) LockRange(min, max int64)

LockRange will ensure an exclusive lock between the min and max values inclusive. Any subsequent calls that have an an overlapping range will have to wait until the previous lock is released. A corresponding call to UnlockRange should be deferred.

func (*WriteLock) UnlockRange

func (w *WriteLock) UnlockRange(min, max int64)

UnlockRange will release a previously locked range.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL