btrdb

package module
v4.15.3 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Nov 27, 2018 License: BSD-2-Clause Imports: 16 Imported by: 48

README

BTrDB golang bindings

GoDoc

These are the go BTrDB bindings. This branch is compatible with version 4, the recent rewrite of BTrDB.

You can read the API documentation and code examples by clicking the godoc button above. To import this package in your code, add

import "gopkg.in/BTrDB/btrdb.v4"

For more information about installing BTrDB, look at https://docs.smartgrid.store

Documentation

Overview

Package btrdb implementes a golang client driver for btrdb

For functions returning value, version and error channels, please pay attention to the following concurrenct pattern:

  • The value channel must be completely consumed, always.
  • The version channel need not be consumed if not required. Only one value will ever be written to the version channel.
  • The error channel need not be read, but you cannot assume that there was not an error just because there were values
  • You can defer reading the error channel until after the value channel is closed (it will be closed early on error).

A good pattern is the following:

valchan, errchan = some.Method()
for v := range valchan {
  do stuff
}
if <-errchan != nil {
  handle error
}

Index

Constants

View Source
const LatestVersion = 0

LatestVersion can be passed to any functions taking a version to use the latest version of that stream

Variables

View Source
var ErrorClusterDegraded = &CodedError{&pb.Status{Code: 419, Msg: "Cluster is degraded"}}

ErrorClusterDegraded is returned when a write operation on an unmapped UUID is attempted. generally the same operation will succeed if attempted once the cluster has recovered.

View Source
var ErrorDisconnected = &CodedError{&pb.Status{Code: 421, Msg: "Driver is disconnected"}}

ErrorDisconnected is returned when operations are attempted after Disconnect() is called.

View Source
var ErrorWrongArgs = &CodedError{&pb.Status{Code: 421, Msg: "Invalid Arguments"}}

ErrorWrongArgs is returned from API functions if the parameters are nonsensical

Functions

func EndpointsFromEnv

func EndpointsFromEnv() []string

EndpointsFromEnv reads the environment variable BTRDB_ENDPOINTS of the format server:port,server:port,server:port and returns it as a string slice. This function is typically used as btrdb.Connect(btrdb.EndpointsFromEnv()...)

func OptKV

func OptKV(iz ...interface{}) map[string]*string

OptKV is a utility function for use in SetAnnotations or LookupStreams that turns a list of arguments into a map[string]*string. Typical use:

OptKV("key","value", //Set or match key=vale
      "key2", nil)   //Delete or match key2=*

OptKV can also take a single map[string]string and return a map[string]*string, e.g

OptKV(stream.Tags()) //Match exactly this set of tags

Types

type AnnotationVersion

type AnnotationVersion uint64

AnnotationVersion is the version of a stream annotation. It begins at 1 for a newly created stream and increases by 1 for each SetStreamAnnotation call. An AnnotationVersion of 0 means "any version"

type BTrDB

type BTrDB struct {
	// contains filtered or unexported fields
}

BTrDB is the main object you should use to interact with BTrDB.

func Connect

func Connect(ctx context.Context, endpoints ...string) (*BTrDB, error)

Connect takes a list of endpoints and returns a BTrDB handle. Note that only a single endpoint is technically required, but having more endpoints will make the initial connection more robust to cluster changes. Different addresses for the same endpoint are permitted

func ConnectAuth

func ConnectAuth(ctx context.Context, apikey string, endpoints ...string) (*BTrDB, error)

ConnectAuth takes an API key and a list of endpoints and returns a BTrDB handle. Note that only a single endpoint is technically required, but having more endpoints will make the initial connection more robust to cluster changes. Different addresses for the same endpoint are permitted

func (*BTrDB) Create

func (b *BTrDB) Create(ctx context.Context, uu uuid.UUID, collection string, tags map[string]string, annotations map[string]string) (*Stream, error)

func (*BTrDB) Disconnect

func (b *BTrDB) Disconnect() error

Disconnect will close all active connections to the cluster. All future calls will return ErrorDisconnected

func (*BTrDB) EndpointFor

func (b *BTrDB) EndpointFor(ctx context.Context, uuid uuid.UUID) (*Endpoint, error)

EndpointFor returns the endpoint that should be used to write the given uuid

func (*BTrDB) EndpointForHash

func (b *BTrDB) EndpointForHash(ctx context.Context, hash uint32) (*Endpoint, error)

EndpointForHash is a low level function that returns a single endpoint for an endpoint hash.

func (*BTrDB) GetAnyEndpoint

func (b *BTrDB) GetAnyEndpoint(ctx context.Context) (*Endpoint, error)

func (*BTrDB) GetMetadataUsage

func (b *BTrDB) GetMetadataUsage(ctx context.Context, prefix string) (tags map[string]int, annotations map[string]int, err error)

func (*BTrDB) Info

func (b *BTrDB) Info(ctx context.Context) (*MASH, error)

func (*BTrDB) ListAllCollections

func (b *BTrDB) ListAllCollections(ctx context.Context) ([]string, error)

func (*BTrDB) ListCollections

func (b *BTrDB) ListCollections(ctx context.Context, prefix string) ([]string, error)

func (*BTrDB) LookupStreams

func (b *BTrDB) LookupStreams(ctx context.Context, collection string, isCollectionPrefix bool, tags map[string]*string, annotations map[string]*string) ([]*Stream, error)

func (*BTrDB) ReadEndpointFor

func (b *BTrDB) ReadEndpointFor(ctx context.Context, uuid uuid.UUID) (*Endpoint, error)

ReadEndpointFor returns the endpoint that should be used to read the given uuid

func (*BTrDB) ResyncMash

func (b *BTrDB) ResyncMash()

func (*BTrDB) SnoopEpErr

func (b *BTrDB) SnoopEpErr(ep *Endpoint, err chan error) chan error

This should invalidate the endpoint if some kind of error occurs. Because some values may have already been delivered, async functions using snoopEpErr will not be able to mask cluster errors from the user

func (*BTrDB) StreamFromUUID

func (b *BTrDB) StreamFromUUID(uu uuid.UUID) *Stream

StreamFromUUID creates a stream handle for use in stream operations. it does not ensure that the stream exists, for that use Stream.Exists()

func (*BTrDB) StreamingLookupStreams

func (b *BTrDB) StreamingLookupStreams(ctx context.Context, collection string, isCollectionPrefix bool, tags map[string]*string, annotations map[string]*string) (chan *Stream, chan error)

func (*BTrDB) TestEpError

func (b *BTrDB) TestEpError(ep *Endpoint, err error) bool

This returns true if you should redo your operation (and get new ep) and false if you should return the last value/error you got

type ChangedRange

type ChangedRange struct {
	Version uint64
	Start   int64
	End     int64
}

type CodedError

type CodedError struct {
	*pb.Status
}

CodedError is an error that contains a numeric code. Most errors returned by this package are actually *CodedError objects. Use ToCodedError()

func ToCodedError

func ToCodedError(e error) *CodedError

ToCodedError can be used to convert any error into a CodedError. If the error object is actually not coded, it will receive code 501.

func (*CodedError) Error

func (ce *CodedError) Error() string

Error() implements the error interface

type Endpoint

type Endpoint struct {
	// contains filtered or unexported fields
}

Endpoint is a low level connection to a single server. Rather use BTrDB which manages creating and destroying Endpoint objects as required

func ConnectEndpoint

func ConnectEndpoint(ctx context.Context, addresses ...string) (*Endpoint, error)

ConnectEndpoint is a low level call that connects to a single BTrDB server. It takes multiple arguments, but it is assumed that they are all different addresses for the same server, in decreasing order of priority. It returns a Endpoint, which is generally never used directly. Rather use Connect()

func ConnectEndpointAuth

func ConnectEndpointAuth(ctx context.Context, apikey string, addresses ...string) (*Endpoint, error)

ConnectEndpointAuth is a low level call that connects to a single BTrDB server. It takes multiple arguments, but it is assumed that they are all different addresses for the same server, in decreasing order of priority. It returns a Endpoint, which is generally never used directly. Rather use ConnectAuthenticated()

func (*Endpoint) AlignedWindows

func (b *Endpoint) AlignedWindows(ctx context.Context, uu uuid.UUID, start int64, end int64, pointwidth uint8, version uint64) (chan StatPoint, chan uint64, chan error)

AlignedWindows is a low level function, rather use Stream.AlignedWindows()

func (*Endpoint) Changes

func (b *Endpoint) Changes(ctx context.Context, uu uuid.UUID, fromVersion uint64, toVersion uint64, resolution uint8) (chan ChangedRange, chan uint64, chan error)

Changes is a low level function, rather use BTrDB.Changes()

func (*Endpoint) Create

func (b *Endpoint) Create(ctx context.Context, uu uuid.UUID, collection string, tags map[string]string, annotations map[string]string) error

Create is a low level function, rather use BTrDB.Create()

func (*Endpoint) DeleteRange

func (b *Endpoint) DeleteRange(ctx context.Context, uu uuid.UUID, start int64, end int64) (uint64, error)

DeleteRange is a low level function, rather use Stream.DeleteRange()

func (*Endpoint) Disconnect

func (b *Endpoint) Disconnect() error

Disconnect will close the underlying GRPC connection. The endpoint cannot be used after calling this method.

func (*Endpoint) FaultInject

func (b *Endpoint) FaultInject(ctx context.Context, typ uint64, args []byte) ([]byte, error)

FaultInject is a debugging function that allows specific low level control of the endpoint. If you have to read the documentation, this is not for you. Server must be started with $BTRDB_ENABLE_FAULT_INJECT=YES

func (*Endpoint) Flush

func (b *Endpoint) Flush(ctx context.Context, uu uuid.UUID) error

Flush is a low level function, rather use Stream.Flush()

func (*Endpoint) GetGRPC

func (b *Endpoint) GetGRPC() pb.BTrDBClient

GetGRPC will return the underlying GRPC client object.

func (*Endpoint) GetMetadataUsage

func (b *Endpoint) GetMetadataUsage(ctx context.Context, prefix string) (tags map[string]int, annotations map[string]int, err error)

GetMetadataUsage is a low level function. Rather use BTrDB.GetMetadataUsage

func (*Endpoint) Info

func (b *Endpoint) Info(ctx context.Context) (*MASH, *pb.InfoResponse, error)

Info is a low level function, rather use BTrDB.Info()

func (*Endpoint) Insert

func (b *Endpoint) Insert(ctx context.Context, uu uuid.UUID, values []*pb.RawPoint) error

Insert is a low level function, rather use Stream.Insert()

func (*Endpoint) ListAllCollections

func (b *Endpoint) ListAllCollections(ctx context.Context) ([]string, error)

ListAllCollections is a low level function, and in particular will only work with small numbers of collections. Rather use BTrDB.ListAllCollections()

func (*Endpoint) ListCollections

func (b *Endpoint) ListCollections(ctx context.Context, prefix string, from string, limit uint64) ([]string, error)

ListCollections is a low level function, and in particular has complex constraints. Rather use BTrDB.ListCollections()

func (*Endpoint) LookupStreams

func (b *Endpoint) LookupStreams(ctx context.Context, collection string, isCollectionPrefix bool, tags map[string]*string, annotations map[string]*string, patchDB *BTrDB) (chan *Stream, chan error)

LookupStreams is a low level function, rather use BTrDB.LookupStreams()

func (*Endpoint) Nearest

func (b *Endpoint) Nearest(ctx context.Context, uu uuid.UUID, time int64, version uint64, backward bool) (RawPoint, uint64, error)

Nearest is a low level function, rather use Stream.Nearest()

func (*Endpoint) Obliterate

func (b *Endpoint) Obliterate(ctx context.Context, uu uuid.UUID) error

Obliterate is a low level function, rather use Stream.Obliterate()

func (*Endpoint) RawValues

func (b *Endpoint) RawValues(ctx context.Context, uu uuid.UUID, start int64, end int64, version uint64) (chan RawPoint, chan uint64, chan error)

RawValues is a low level function, rather use Stream.RawValues()

func (*Endpoint) SetStreamAnnotations

func (b *Endpoint) SetStreamAnnotations(ctx context.Context, uu uuid.UUID, expected AnnotationVersion, changes map[string]*string) error

SetStreamAnnotation is a low level function, rather use Stream.SetAnnotation() or Stream.CompareAndSetAnnotation()

func (*Endpoint) StreamInfo

func (b *Endpoint) StreamInfo(ctx context.Context, uu uuid.UUID, omitDescriptor bool, omitVersion bool) (
	collection string,
	aver AnnotationVersion,
	tags map[string]string,
	anns map[string]string,
	version uint64, err error)

StreamAnnotation is a low level function, rather use Stream.Annotation()

func (*Endpoint) Windows

func (b *Endpoint) Windows(ctx context.Context, uu uuid.UUID, start int64, end int64, width uint64, depth uint8, version uint64) (chan StatPoint, chan uint64, chan error)

Windows is a low level function, rather use Stream.Windows()

type M

type M map[string]string

M is an alias to neaten code specifying tags: btrdb.LookupStream(ctx, "mycollection", btrdb.M{"tagkey":"tagval"})

type MASH

type MASH struct {
	*pb.Mash
	// contains filtered or unexported fields
}

The MASH struct (Master Allocation by Stable Hashing) contains information about the cluster and which fraction of the uuid space is being served by which endpoints. Generally you will not need to use this, but it is handy for checking the cluster is healthy.

func (*MASH) EndpointFor

func (m *MASH) EndpointFor(uuid uuid.UUID) (found bool, hash uint32, addrs []string)

EndpointFor will take a uuid and return the connection details for the endpoint that can service a write to that uuid. This is a low level function.

type RawPoint

type RawPoint struct {
	//Nanoseconds since the epoch
	Time int64
	//Value. Units are stream-dependent
	Value float64
}

RawPoint represents a single timestamped value

type StatPoint

type StatPoint struct {
	//The time of the start of the window, in nanoseconds since the epoch UTC
	Time  int64
	Min   float64
	Mean  float64
	Max   float64
	Count uint64
}

StatPoint represents a statistical summary of a window. The length of that window must be determined from context (e.g the parameters passed to AlignedWindow or Window methods)

type Stream

type Stream struct {
	// contains filtered or unexported fields
}

Stream is a handle on a Stream in BTrDB. Stream operations should be done through this object.

func (*Stream) AlignedWindows

func (s *Stream) AlignedWindows(ctx context.Context, start int64, end int64, pointwidth uint8, version uint64) (chan StatPoint, chan uint64, chan error)

AlignedWindows reads power-of-two aligned windows from BTrDB. It is faster than Windows(). Each returned window will be 2^pointwidth nanoseconds long, starting at start. Note that start is inclusive, but end is exclusive. That is, results will be returned for all windows that start in the interval [start, end). If end < start+2^pointwidth you will not get any results. If start and end are not powers of two, the bottom pointwidth bits will be cleared. Each window will contain statistical summaries of the window. Statistical points with count == 0 will be omitted.

func (*Stream) Annotations

func (s *Stream) Annotations(ctx context.Context) (map[string]string, AnnotationVersion, error)

Annotations returns the annotations of the stream (and the annotation version). It will always require a round trip to the server. If you are ok with stale data and want a higher performance version, use Stream.CachedAnnotations(). Do not modify the resulting map.

func (*Stream) CachedAnnotations

func (s *Stream) CachedAnnotations(ctx context.Context) (map[string]string, AnnotationVersion, error)

CachedAnnotations returns the annotations of the stream, reusing previous results if available, otherwise fetching from the server

func (*Stream) Changes

func (s *Stream) Changes(ctx context.Context, fromVersion uint64, toVersion uint64, resolution uint8) (crv chan ChangedRange, cver chan uint64, cerr chan error)

func (*Stream) Collection

func (s *Stream) Collection(ctx context.Context) (string, error)

Collection returns the collection of the stream. It may require a round trip to the server depending on how the stream was acquired

func (*Stream) CompareAndSetAnnotation

func (s *Stream) CompareAndSetAnnotation(ctx context.Context, expected AnnotationVersion, changes map[string]*string) error

CompareAndSetAnnotation will make the changes in the given map (where a nil pointer means delete) as long as the annotation version matches

func (*Stream) DeleteRange

func (s *Stream) DeleteRange(ctx context.Context, start int64, end int64) (ver uint64, err error)

DeleteRange will delete all points between start (inclusive) and end (exclusive). Note that BTrDB has persistent multiversioning, so the deleted points can still be accessed on an older version of the stream returns the version of the stream and any error

func (*Stream) Exists

func (s *Stream) Exists(ctx context.Context) (bool, error)

Exists returns true if the stream exists. This is essential after using StreamFromUUID as the stream may not exist, causing a 404 error on later stream operations. Any operation that returns a stream from collection and tags will have ensured the stream exists already.

func (*Stream) Flush

func (s *Stream) Flush(ctx context.Context) error

Flush writes the stream buffers out to persistent storage

func (*Stream) Insert

func (s *Stream) Insert(ctx context.Context, vals []RawPoint) error

Insert inserts the given array of RawPoint values. If the array is larger than appropriate, this function will automatically chunk the inserts. As a consequence, the insert is not necessarily atomic, but can be used with very large arrays.

func (*Stream) InsertF

func (s *Stream) InsertF(ctx context.Context, length int, time func(int) int64, val func(int) float64) error

InsertF will call the given time and val functions to get each value of the insertion. It is similar to InsertTV but may require less allocations if your data is already in a different data structure. If the size is larger than appropriate, this function will automatically chunk the inserts. As a consequence, the insert is not necessarily atomic, but can be used with very large size.

func (*Stream) InsertTV

func (s *Stream) InsertTV(ctx context.Context, times []int64, values []float64) error

InsertTV allows insertion of two equal length arrays, one containing times and the other containing values. The arrays need not be sorted, but they must correspond (i.e the first element of times is the time for the firt element of values). If the arrays are larger than appropriate, this function will automatically chunk the inserts. As a consequence, the insert is not necessarily atomic, but can be used with very large arrays.

func (*Stream) Nearest

func (s *Stream) Nearest(ctx context.Context, time int64, version uint64, backward bool) (rv RawPoint, ver uint64, err error)

Nearest will return the nearest point to the given time. If backward is false, the returned point will be >= time. If backward is true, the returned point will be <time. The version of the stream used to satisfy the query is returned.

func (*Stream) Obliterate

func (s *Stream) Obliterate(ctx context.Context) error

Obliterate completely removes a stream. This operation is immediate but the space will only be freed slowly

func (*Stream) RawValues

func (s *Stream) RawValues(ctx context.Context, start int64, end int64, version uint64) (chan RawPoint, chan uint64, chan error)

RawValues reads raw values from BTrDB. The returned RawPoint channel must be fully consumed.

func (*Stream) Tags

func (s *Stream) Tags(ctx context.Context) (map[string]string, error)

Tags returns the tags of the stream. It may require a round trip to the server depending on how the stream was acquired. Do not modify the resulting map as it is a reference to the internal stream state

func (*Stream) UUID

func (s *Stream) UUID() uuid.UUID

UUID returns the stream's UUID. The stream may nor may not exist yet, depending on how the stream object was obtained. See also Stream.Exists()

func (*Stream) Version

func (s *Stream) Version(ctx context.Context) (uint64, error)

Version returns the current data version of the stream. This is not cached, it queries each time. Take care that you do not intorduce races in your code by assuming this function will always return the same vaue

func (*Stream) Windows

func (s *Stream) Windows(ctx context.Context, start int64, end int64, width uint64, depth uint8, version uint64) (chan StatPoint, chan uint64, chan error)

Windows returns arbitrary precision windows from BTrDB. It is slower than AlignedWindows, but still significantly faster than RawValues. Each returned window will be width nanoseconds long. start is inclusive, but end is exclusive (e.g if end < start+width you will get no results). That is, results will be returned for all windows that start at a time less than the end timestamp. If (end - start) is not a multiple of width, then end will be decreased to the greatest value less than end such that (end - start) is a multiple of width (i.e., we set end = start + width * floordiv(end - start, width). The depth parameter is an optimization that can be used to speed up queries on fast queries. Each window will be accurate to 2^depth nanoseconds. If depth is zero, the results are accurate to the nanosecond. On a dense stream for large windows, this accuracy may not be required. For example for a window of a day, +- one second may be appropriate, so a depth of 30 can be specified. This is much faster to execute on the database side. The StatPoint channel MUST be fully consumed.

Directories

Path Synopsis
Package grpcinterface is a reverse proxy.
Package grpcinterface is a reverse proxy.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL