v0.1.6 Latest Latest

This package is not in the latest version of its module.

Go to latest
Published: Aug 24, 2015 License: MIT Imports: 17 Imported by: 0



Package WAL implements a write ahead log optimized for write throughput that can be put in front of the database index.

The WAL is broken into different partitions. The default number of partitions is 5. Each partition consists of a number of segment files. By default these files will get up to 2MB in size before a new segment file is opened. The files are numbered and start at 1. The number indicates the order in which the files should be read on startup to ensure data is recovered in the same order it was written.

Partitions are flushed and compacted individually. One of the goals with having multiple partitions was to be able to flush only a portion of the WAL at a time.

The WAL does not flush everything in a partition when it comes time. It will only flush series that are over a given threshold (32kb by default). The rest will be written into a new segment file so they can be flushed later. This is like a compaction in an LSM Tree.



View Source
const (
	// DefaultSegmentSize of 2MB is the size at which segment files will be rolled over
	DefaultSegmentSize = 2 * 1024 * 1024

	// PartitionCount is the number of partitions in the WAL
	PartitionCount = 5

	// FileExtension is the file extension we expect for wal segments
	FileExtension = "wal"

	// MetaFileExtension is the file extension for the log files of new fields and measurements that get created
	MetaFileExtension = "meta"

	// CompactionExtension is the file extension we expect for compaction files
	CompactionExtension = "CPT"

	// MetaFlushInterval is the period after which any compressed meta data in the .meta file will get
	// flushed to the index
	MetaFlushInterval = 10 * time.Minute


View Source
var (
	// ErrCompactionRunning to return if we attempt to run a compaction on a partition that is currently running one
	ErrCompactionRunning = errors.New("compaction running")

	// ErrMemoryCompactionDone gets returned if we called to flushAndCompact to free up memory
	// but a compaction has already been done to do so
	ErrMemoryCompactionDone = errors.New("compaction already run to free up memory")

	// CompactSequence is the byte sequence within a segment file that has been compacted
	// that indicates the start of a compaction marker
	CompactSequence = []byte{0xFF, 0xFF}


func MarshalEntry

func MarshalEntry(timestamp int64, data []byte) []byte

marshalCacheEntry encodes the timestamp and data to a single byte slice.

The format of the byte slice is:

uint64 timestamp
[]byte data

func UnmarshalEntry

func UnmarshalEntry(buf []byte) (timestamp int64, data []byte)

unmarshalCacheEntry returns the timestamp and data from an encoded byte slice.


type IndexWriter

type IndexWriter interface {
	// time ascending points where each byte array is:
	//   int64 time
	//   data
	WriteIndex(pointsByKey map[string][][]byte, measurementFieldsToSave map[string]*tsdb.MeasurementFields, seriesToCreate []*tsdb.SeriesCreate) error

IndexWriter is an interface for the indexed database the WAL flushes data to

type Log

type Log struct {

	// LogOutput is the writer used by the logger.
	LogOutput io.Writer

	// FlushColdInterval is the period of time after which a partition will do a
	// full flush and compaction if it has been cold for writes.
	FlushColdInterval time.Duration

	// SegmentSize is the file size at which a segment file will be rotated in a partition.
	SegmentSize int64

	// MaxSeriesSize controls when a partition should get flushed to index and compacted
	// if any series in the partition has exceeded this size threshold
	MaxSeriesSize int

	// ReadySeriesSize is the minimum size a series of points must get to before getting flushed.
	ReadySeriesSize int

	// CompactionThreshold controls when a parition will be flushed. Once this
	// percentage of series in a partition are ready, a flush and compaction will be triggered.
	CompactionThreshold float64

	// PartitionSizeThreshold specifies when a partition should be forced to be flushed.
	PartitionSizeThreshold uint64

	// Index is the database that series data gets flushed to once it gets compacted
	// out of the WAL.
	Index IndexWriter

	// EnableLogging specifies if detailed logs should be output
	EnableLogging bool
	// contains filtered or unexported fields

func NewLog

func NewLog(path string) *Log

func (*Log) Close

func (l *Log) Close() error

Close will finish any flush that is currently in process and close file handles

func (*Log) Cursor

func (l *Log) Cursor(key string) tsdb.Cursor

Cursor will return a cursor object to Seek and iterate with Next for the WAL cache for the given

func (*Log) DeleteSeries

func (l *Log) DeleteSeries(keys []string) error

DeleteSeries will flush the metadata that is in the WAL to the index and remove all series specified from the cache and the segment files in each partition. This will block all writes while a compaction is done against all partitions. This function is meant to be called by bz1 BEFORE it updates its own index, since the metadata is flushed here first.

func (*Log) Flush

func (l *Log) Flush() error

Flush will force a flush on all paritions

func (*Log) LoadMetadataIndex

func (l *Log) LoadMetadataIndex(index *tsdb.DatabaseIndex, measurementFields map[string]*tsdb.MeasurementFields) error

LoadMetadatIndex loads the new series and fields files into memory and flushes them to the BoltDB index. This function should be called before making a call to Open()

func (*Log) Open

func (l *Log) Open() error

Open opens and initializes the Log. Will recover from previous unclosed shutdowns

func (*Log) WritePoints

func (l *Log) WritePoints(points []tsdb.Point, fields map[string]*tsdb.MeasurementFields, series []*tsdb.SeriesCreate) error

type Partition

type Partition struct {
	// contains filtered or unexported fields

Partition is a set of files for a partition of the WAL. We use multiple partitions so when compactions occur only a portion of the WAL must be flushed and compacted

func NewPartition

func NewPartition(id uint8, path string, segmentSize int64, sizeThreshold uint64, readySeriesSize int, flushColdInterval time.Duration, index IndexWriter) (*Partition, error)

func (*Partition) Close

func (p *Partition) Close() error

Close resets the caches and closes the currently open segment file

func (*Partition) Write

func (p *Partition) Write(points []tsdb.Point) error

Write will write a compressed block of the points to the current segment file. If the segment file is larger than the max size, it will roll over to a new file before performing the write. This method will also add the points to the in memory cache

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL