cesium

package module
v0.0.0-...-a6f45d6 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Aug 17, 2022 License: MIT Imports: 30 Imported by: 9

README


Cesium

Cesium extends CockroachDB's Pebble to provide fast storage for time-series data.

Use Case

Cesium is tailored towards a specific class of time-series data:

  1. Regular - samples are taken at specific, known intervals. Cesium will not work well with data that arrives at unpredictable intervals.
  2. High Speed - cesium is happiest at sample rates between 10 Hz and 1 MHz. Although it can work with data at any rate, it will be far slower than other storage engines for low sample rates.

Concepts

The following is an overview of Cesium concepts from an interface perspective. A detailed design RFC is available for those interested in the implementation.

Channels

A channel (cesium.Channel) is a time-ordered collection of samples. It's best to approach them as a device (whether physical or virtual) that emits values with the following properties:

  1. Time-ordered
  2. Time-unique - no two values are emitted at the same time.
  3. Constant size - all emitted values have the same amount of bytes.
  4. Regular - all values are emitted at a constant interval.

These values are known as samples. Samples can be measurements from a sensor, events from a stream, metrics from a database, or images from a camera (to name a few).

Segments

A segment (cesium.Segment) is a contiguous run of samples (between 1 B and 2.5 MB). Cesium stores all values in a segment sequentially on disk, so it's naturally best to write large segments over small segments.

This obviously has implications in terms of durability, as smaller segments will prevent data loss in the event of a failure. It's up to you to weigh the performance vs. durability risk.

Data Types

Cesium samples can be of any data type that can be serialized to a byte array. Cesium has a few built-in data types:

cesium.Float64
cesium.Float32
cesium.Int64
cesium.Int32
cesium.Int16
cesium.Int8
cesium.Uint64
cesium.Uint32
cesium.Uint16
cesium.Uint8
Custom Data Types

Defining a custom data type is as simple as defining a constant of type cesium.DataType with its size in bytes:

package main

import (
	"github.com/arya-analytics/cesium"
)

// TenByTenImage is a custom data type where each sample is 10 * 10 * 3 bytes in size.
const TenByTenImage cesium.DataType = 10 * 10 * 3

Production Readiness

Cesium is in Alpha state, and is not ready for production use.

Installation

go get github.com/arya-analytics/cesium

Getting Started

Writing Samples

The following example is a simple, synchronous example of writing samples to a channel.

package main

import (
	"context"
	"github.com/arya-analytics/cesium"
	"log"
)

func main() {
	ctx := context.Background()

	// Open a DB whose files live in the "testdata" directory.
	db, err := cesium.Open("testdata")
	if err != nil {
		log.Fatal(err)
	}

	const (
		dataType = cesium.Float64
		dataRate = 5 * cesium.Hz
	)

	// Create a new channel whose samples are float64 values recorded at 5 Hz.
	ch, err := db.NewCreateChannel().WithType(dataType).WithRate(dataRate).Exec(ctx)
	if err != nil {
		log.Fatal(err)
	}

	// Create a new Segment to write. If you don't know what segments are, 
	// check out the Segment documentation.
	segments := []cesium.Segment{
		{
			ChannelKey: ch.Key,
			Start:      cesium.Now(),
			Data:       cesium.MarshalFloat64([]float64{1.0, 2.0, 3.0}),
		},
	}

	// Open the query. DB.Sync is a helper that turns a typically async write 
	// into an acknowledged, synchronous write.
	if err := db.Sync(ctx, db.NewCreate().WhereChannels(ch.Key), &segments); err != nil {
		log.Fatal(err)
	}
}

Documentation

Index

Constants

View Source
const (
	TimeStampMin = telem.TimeStampMin
	TimeStampMax = telem.TimeStampMax
	TimeSpanMin  = telem.TimeSpanZero
	TimeSpanMax  = telem.TimeSpanMax
	Kilobytes    = telem.Kilobytes
	Float64      = telem.Float64
	Float32      = telem.Float32
	Int64        = telem.Int64
	Int32        = telem.Int32
	Int16        = telem.Int16
	Int8         = telem.Int8
	Uint64       = telem.Uint64
	Uint32       = telem.Uint32
	Uint16       = telem.Uint16
	Uint8        = telem.Uint8
	Microsecond  = telem.Microsecond
	Millisecond  = telem.Millisecond
	Second       = telem.Second
	Minute       = telem.Minute
	Hour         = telem.Hour
	Hz           = telem.Hz
	KHz          = telem.KHz
)

Variables

View Source
var (
	// NotFound is returned when a channel or a range of data cannot be found in the DB.
	NotFound = query.NotFound
	// UniqueViolation is returned when a provided channel key already exists in the DB.
	UniqueViolation = query.UniqueViolation
)
View Source
var (
	TimeRangeMax = telem.TimeRangeMax
)

Functions

func MarshalFloat64

func MarshalFloat64(data []float64) []byte

Types

type Channel

type Channel = channel.Channel

type ChannelKey

type ChannelKey = channel.Key

type Create

type Create struct {
	query.Query
	// contains filtered or unexported fields
}

func (Create) Stream

func (c Create) Stream(ctx context.Context) (chan<- CreateRequest, <-chan CreateResponse, error)

Stream opens the stream.

func (Create) WhereChannels

func (c Create) WhereChannels(keys ...channel.Key) Create

WhereChannels sets the channels to acquire a lock on for creation. The request stream will only accept segmentKV bound to channel with the given primary keys. If no keys are provided, will return an ErrInvalidQuery error.

type CreateRequest

type CreateRequest struct {
	Segments []segment.Segment
}

CreateRequest is a request containing a set of segments (segment) to write to the DB.

type CreateResponse

type CreateResponse struct {
	Error error
}

CreateResponse contains any errors that occurred during the execution of the Create Query.

type DB

type DB interface {

	// NewCreate opens a new Create query that is used for writing data to the DB.
	// A simple, synchronous create query looks is the following:
	//
	//      // Open a DB with a memory backed file system.
	//		db := cesium.Open("", cesium.MemBacked())
	//
	//      // Create a new channel that samples five float64 values per second. StorageKeys
	//      // will automatically generate a sequential uint16 key for the channel.
	//      // It is possible to specify a custom, UNIQUE key for the channel.
	//      key, err := cesium.CreateChannel(cesium.Channel{
	//          DataType: cesium.Float64,
	//          DataRate: 5 * cesium.Hz,
	//		})
	//		if err != nil {
	// 	    	 log.Fatal(err)
	//		}
	//
	//	    // Create a new segment to write. If you don't know what segments are,
	//	    // check out the cesium.Segment documentation.
	//      segments := []cesium.Segment{
	//    		ChannelKey: key,
	//          Start: cesium.Now(),
	//          data: cesium.MarshalFloat64([]{1.0, 2.0, 3.0})
	//		}
	//
	//		// db.Sync is a helper that turns a typically asynchronous write into an
	//		// acknowledged, synchronous write.
	//	    err := db.Sync(ctx, db.NewCreate().WhereChannels(key), &segments)
	//		if err != nil {
	//			logger.Fatal(err)
	//		}
	//
	// The above example will create a new channel with the type Float64 and a data rate of 5 Hz.
	// It will then write a segment with 3 samples to the database.
	//
	// The Create query acquires a write lock on the channels provided to the query
	// (in create.WhereChannels). No other goroutines can write to the locked channels
	// until the query completed.
	//
	// Asynchronous Create queries are default in cesium to allow for network
	// optimization and multi-segment write locks. They are a bit more complex to
	// write, however.  See the following example:
	//
	//		// Assuming DB is opened, channel is created, and a segment is defined.
	//	 	// See above example for details.
	//
	//	    // Open the create query for the channel. The first return value
	//     // is a channel where segments are written. The second is a response channel
	//      // containing any errors encountered during writes. The last value is an
	//      // error value that is returned if the query fails to open properly.
	//		// RouteStream for details on what each return value does.
	//		req, res, err := db.NewCreate().WhereChannels(key).RouteStream(ctx)
	//
	//		// Write the segment to the Create Request RouteStream.
	//      req <- cesium.CreateRequest{Segments: segments}
	//
	//		// Close the request stream. This lets the query know its safe to shut down
	// 		// operations and release locks.
	//		close(req)
	//
	//		// Wait for the Create query to acknowledge all writes. The Create query
	//	 	// will close the response channel when all segments are durable.
	// 		for resV := range res {
	//			if resV.err != nil {
	//				logger.Fatal(resV.err)
	//			}
	//		}
	//
	//		// Do what you want, but remember to close the database when done.
	//
	// Although waiting for the response channel to close is a common pattern for
	// Create queries, it is not required. StorageKeys will ensure all writes are
	// acknowledged upon DB.Close.
	//
	// It's important to note that the Create query does NOT operate in an atomic
	// manner. It is possible for certain segments to be persisted while others are not.
	NewCreate() Create

	// NewRetrieve opens a new Retrieve query that is used for retrieving data from the DB. A simple, synchronous
	// retrieve query looks like the following:
	//
	// 	 	// Open the DB, create a channel, and write some data to it. See NewCreate
	//	 	// for details on how to accomplish this.
	//
	//		// Open the Retrieve query, and read the results from disk into res.
	//		// DB.Sync is a helper that turns an asynchronous retrieve in a
	//		// synchronous one.
	//		var res []cesium.Segment
	// 		q := db.NewRewRetrieve().WhereChannels(key)
	//		if err := db.syncExec(ctx, q, &res); err != nil {
	//			log.Fatal(err)
	//		}
	//
	// The above example retrieves all data from the channel and binds it into res.
	// It's possible to retrieve a subset of data by time range by using the
	// Retrieve.WhereTimeRange method.
	//
	// Notes on Segmentation of data:
	//
	//		Retrieve results returned as segments (segment). The segments are not
	//	 	guaranteed to be in chronological order. This is a performance optimization
	//	 	to allow for more efficient data retrieval.
	//
	//	 	Segments are also not guaranteed to be contiguous. Because Create pattern
	//	 	cesium uses, it's possible to leave time gaps between segments
	//	 	(these represent times when that a particular sensor/sensor/emitter was
	//	 	inactive).
	//
	// Retrieve may return the following errors upon opening:
	//
	//		NotFound - The channel was not found in the database or the time range
	//		provided contained no data.
	//
	// Asynchronous Retrieve queries are the default in cesium. This allows for
	// network optimization (i.e. send the data across the network as you read more
	// data from IO). However, they are a more complex to write, however. See the
	// following example:
	//
	// 		// Assuming DB is opened and two segment have been created for a channel
	//	 	// with key 'key'. See NewCreate for details on how to accomplish this.
	//      // Start the retrieve query. The first return value is a channel that
	// 		// segments from disk are written to. The second is an error encountered
	// 		// during query open. To cancel a query abruptly, cancel the context
	// 		// provided to the Retrieve.Stream method.
	// 		ctx, cancel := context.WithCancel(context.Background())
	//		res, err := db.NewRetrieve().
	//						WhereTimeRange(cesium.TimeRangeMax).
	//						WhereChannels(key).
	//						RouteStream(ctx)
	//
	//      var res []cesium.Segment
	//
	//		// Retrieve will close the response channel when it has finished reading
	//      // all segments from disk.
	// 		for _, resV := range res {
	//			if resV.Error != nil {
	//				logger.Fatal(resV.Error)
	//			}
	//			res = append(res, res.Segments...)
	//		}
	//
	//      // do what you want with the data, just remember to close the database when done.
	//
	// It's also possible to iterate over the data in a set of channels. To do this,
	// call Retrieve.Iterate on the query instead of Retrieve.Stream:
	//
	//		ctx, cancel := context.WithCancel(context.Background())
	//		iter := db.NewRetrieve().
	//						WhereTimeRange(cesium.TimeRangeMax).
	//						WhereChannels(key).
	//						Write(ctx)
	//
	//      // It's important to check for errors before proceeding.
	//      if err := iter.Error(); err != nil {
	//         log.Fatal(err)
	//      }
	//
	//		// Start a goroutine that reads data returned from the iterator.
	//		go func() {
	//		for {
	//          // Open a stream with a buffer value of ten and pipe values from
	//          // the iterator to it.
	//			stream := confluence.NewStream[cesium.RetrieveResponse](10)
	//			iter.OutTo(stream)
	//
	//			for res := range stream.Output() {
	//				if res.Error != nil {
	//					log.Fatal(res.Error)
	//				}
	//				// Do something with the data.
	//				sendOverTheNetwork(res.Segments)
	// 			}
	//		}()
	//
	//      // Seek to the start of the iterator.
	//		iter.SeekFirst()
	//
	//      // Read the next 15 minutes of data. valid will be true if any data exists
	// 		// within the span. valid will return false if any errors are encountered.
	//      valid := iter.NextSpan(15 * time.Minutes)
	//
	//	    // Close the iterator. This will close any response channels bound by
	//	    // calling OutTo.
	//		if err := iter.Close(); err != nil {
	//			log.Fatal(err)
	//		}
	//
	NewRetrieve() Retrieve

	// CreateChannel opens a new CreateChannel query that is used for creating a
	// new channel in the DB. Creating a channel is simple:
	//
	//		// Open the DB
	//		ctx := context.Background()
	//		db := cesium.Open("", cesium.MemBacked())
	//
	//		// Create a channel. The generated key can be used to write data to and
	// 		// retrieve data from the channel.
	//		key, err := cesium.CreateChannel(cesium.Channel{
	//           DataRate: 5 *cesium.Hz,
	//           DataType: cesium.Float64,
	//		})
	//		if err != nil {
	//			logger.Fatal(err)
	//		}
	//		fmt.Println(key)
	//		// output:
	//		//  1
	//
	// If the cesium.channel.Field field is not set, the DB will automatically generate
	// an auto-incrementing uint16 key for you. StorageKeys requires that all channels have
	// a unique key. If you attempt to create a channel with a duplicate key, the DB
	// will return a UniqueViolation error.
	CreateChannel(ch Channel) (ChannelKey, error)

	// RetrieveChannel retrieves information about the channels with the specified keys
	// from the DB. Retrieving a channel is simple.
	//
	// 		// Assuming DB is opened and a channel with key 1 has been created.
	//		// See CreateChannel for details on how to accomplish this.
	//
	//		// Retrieve the channel.
	//		ch, err := cesium.RetrieveChannel()
	//		if err != nil {
	//			logger.Fatal(err)
	//		}
	//		fmt.Println(ch.Field)
	//		// output:
	//		//  1
	//
	// If any of the channels with the provided keys cannot be found, DB will return
	// a NotFound error.
	RetrieveChannel(keys ...ChannelKey) ([]Channel, error)

	// Sync is a utility that executes a query synchronously. It is useful for operations
	// that require all data to be persisted/returnee before continuing.
	//
	// Sync only supports Create and Retrieve queries, and will panic if any other
	// entity is passed. In the case of a Create query, the 'segments' arg represents
	// the data to write to the DB. A Retrieve query will do the reverse,
	// binding returned data into 'segments' instead.
	//
	// For examples on how to use Sync, see the documentation for NewCreate and NewRetrieve.
	Sync(ctx context.Context, query interface{}, segments *[]Segment) error

	// Close closes the DB. Close ensures that all queries are complete and all data
	//is persisted to disk. Close will block until all queries are completed,
	// so make sure to stop any running queries before calling.
	Close() error
}

func Open

func Open(dirname string, opts ...Option) (DB, error)

Open opens a new DB whose files are stored in the given directory. DB can be opened with a variety of options:

	// Open a DB in memory.
 cesium.MemBacked()

 // Open a DB with the provided logger.
	cesium.WithLogger(zap.NewNop())

	// Bind an alamos.Experiment to register DB metrics.
	cesium.WithExperiment(alamos.WithCancel("myExperiment"))

	// Override the default shutdown threshold.
 cesium.WithShutdownThreshold(time.Second)

 // Set custom shutdown options.
	cesium.WithShutdownOptions()

See each options documentation for more.

type DataRate

type DataRate = telem.DataRate

type DataType

type DataType = telem.DataType

type Density

type Density = telem.Density

type Option

type Option func(*options)

func MemBacked

func MemBacked() Option

func WithExperiment

func WithExperiment(exp alamos.Experiment) Option

func WithFS

func WithFS(vfs vfs.FS, baseKFS kfs.BaseFS) Option

func WithKVEngine

func WithKVEngine(kv kv.DB) Option

func WithLogger

func WithLogger(logger *zap.Logger) Option

type Retrieve

type Retrieve struct {
	query.Query
	// contains filtered or unexported fields
}

func (Retrieve) Iterate

func (r Retrieve) Iterate() StreamIterator

func (Retrieve) Stream

func (r Retrieve) Stream(ctx context.Context, res chan<- RetrieveResponse) (err error)

Stream streams all segments from the iterator out to the channel. Errors encountered during stream construction are returned immediately. Errors encountered during segment reads are returns as part of RetrieveResponse.

func (Retrieve) WhereChannels

func (r Retrieve) WhereChannels(keys ...ChannelKey) Retrieve

WhereChannels sets the channels to retrieve data for. If no keys are provided, will return an ErrInvalidQuery error.

func (Retrieve) WhereTimeRange

func (r Retrieve) WhereTimeRange(tr TimeRange) Retrieve

WhereTimeRange sets the time range to retrieve data from.

type RetrieveResponse

type RetrieveResponse struct {
	Segments []segment.Segment
}

RetrieveResponse is a response containing segments satisfying a Retrieve Query as well as any errors encountered during the retrieval.

type Segment

type Segment = segment.Segment

type Size

type Size = telem.Size

type StreamIterator

type StreamIterator interface {
	// Source is the outlet for the StreamIterator values. All segments read from disk
	// are piped to the Source outlet. StreamIterator should be the ONLY entity writing
	// to the Source outlet (StreamIterator.Close will close the Source outlet).
	confluence.Source[RetrieveResponse]
	// Next pipes the next segment in the StreamIterator to the Source outlet.
	// It returns true if the StreamIterator is pointing to a valid segment.
	Next() bool
	// Prev pipes the previous segment in the StreamIterator to the Source outlet.
	// It returns true if the StreamIterator is pointing to a valid segment.
	Prev() bool
	// First seeks to the first segment in the StreamIterator. Returns true
	// if the streamIterator is pointing to a valid segment.
	First() bool
	// Last seeks to the last segment in the StreamIterator. Returns true
	// if the streamIterator is pointing to a valid segment.
	Last() bool
	// NextSpan pipes all segments in the StreamIterator from the current position to
	// the end of the span. It returns true if the streamIterator is pointing to a
	// valid segment. If span is TimeSpanMax, it will exhaust the streamIterator. If
	// span is TimeSpanZero, it won't do anything.
	NextSpan(span TimeSpan) bool
	// PrevSpan pipes all segments in the StreamIterator from the current
	PrevSpan(span TimeSpan) bool
	// NextRange seeks the StreamIterator to the start of the provided range and pipes all segments bound by it
	// to the Source outlet. It returns true if the streamIterator is pointing to a valid segment.
	// If range is TimeRangeMax, exhausts the StreamIterator. If range is TimeRangeZero, it won't do anything.
	NextRange(tr telem.TimeRange) bool
	// SeekFirst seeks the iterator to the first segment in the range.
	SeekFirst() bool
	// SeekLast seeks the iterator to the last segment in the range.
	SeekLast() bool
	// SeekLT seeks the StreamIterator to the first segment with a timestamp less than the provided timestamp.
	// It returns true if the StreamIterator is pointing to a valid segment.
	SeekLT(time TimeStamp) bool
	// SeekGE seeks the StreamIterator to the first segment with a timestamp greater than or equal to the provided timestamp.
	// It returns true if the StreamIterator is pointing to a valid segment.
	SeekGE(time TimeStamp) bool
	// View returns the current range of values the StreamIterator has a 'view' of.  This view represents the range of
	// segments most recently returned to the caller.
	View() TimeRange
	// Close closes the StreamIterator, ensuring that all in-progress segment reads complete before closing the Source outlet.
	Close() error
	// Valid returns true if the StreamIterator is pointing at valid segments.
	Valid() bool
	// Error returns any errors accumulated by the StreamIterator.
	Error() error
}

type TimeRange

type TimeRange = telem.TimeRange

type TimeSpan

type TimeSpan = telem.TimeSpan

type TimeStamp

type TimeStamp = telem.TimeStamp

func Now

func Now() TimeStamp

Directories

Path Synopsis
internal
kv
seg

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL