redists

package module
v0.5.2 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: May 18, 2022 License: MIT Imports: 7 Imported by: 0

README

RedisTS

RedisTS is a typesafe Go client for RedisTimeSeries.

Go Reference

RedisTimeSeries documentation: https://oss.redis.com/redistimeseries/commands/

This library tries to support multiple Redis clients, because applications probably already use one. There are examples in the reference for github.com/go-redis/redis/v8, github.com/gomodule/redigo, github.com/joomcode/redispipe, and github.com/mediocregopher/radix/v4 demonstrating how one can create a new RedisTS client using them.

Focus

RedisTS was created during a coding spree which had the following focus:

  1. Functional options for friendly APIs.
  2. Type safety.
  3. Switched word order for better autocompletion and to follow the naming scheme used in stdlib (e.g. http.MethodGet, http.StatusNotFound).
  4. Compatibility with multiple Redis clients.
  5. Accept time.Time and time.Duration where a parameter is a timestamp in milliseconds or a duration in milliseconds.

Get module

go get -u github.com/coding-socks/redists

Running tests

go test

The tests expect a Redis server with RedisTimeSeries ^v1.6 module to be available at localhost:6379. One can use -test.short to skip those tests.

go test -test.short

Below you can find an example code to run a Redis server with "edge" version of RedisTimeSeries via docker.

docker run --name dev-redists -p 6379:6379 -d redislabs/redistimeseries:edge

Supported clients

RedisTS is tested with the following clients:

  • github.com/go-redis/redis/v8
  • github.com/gomodule/redigo
  • github.com/joomcode/redispipe
  • github.com/mediocregopher/radix/v4

It probably works with others, but it's not guaranteed. Feel free to open an issue to get support for other clients, because if it isn't too much effort it will be added to the list above.

Production readiness

This project is still in alpha phase. In this stage the public API can change multiple times a day.

Beta version will be considered when the feature set covers the documents the implementation is based on, and the public API reaches a mature state.

Alternative libraries

Contribution

Any type of contribution is welcome; from features, bug fixes, documentation improvements, feedbacks, questions. While GitHub uses the word "issue" feel free to open up a GitHub issue for any of these.

License

RedisTS is distributed under the MIT license.

Documentation

Overview

Package redists is a typesafe Go client for RedisTimeSeries. It tries to support multiple Redis clients, because applications probably already use one.

Creating a client with GoRedis

The following example shows how to create a Doer implementation with GoRedis:

package main

import (
	"context"
	goredis "github.com/go-redis/redis/v8"
	"github.com/coding-socks/redists"
	"time"
)

type goredisDoer struct {
	c *goredis.Client
}

func (f goredisDoer) Do(ctx context.Context, cmd string, args ...interface{}) (interface{}, error) {
	args = append([]interface{}{cmd}, args...)
	return f.c.Do(ctx, args...).Result()
}

func main() {
	client := goredis.NewClient(&goredis.Options{
		Addr:     "localhost:6379", // use default Addr
		Password: "",               // no password set
		DB:       0,                // use default DB
	})
	defer client.Close()
	err := client.Ping(context.Background()).Err()
	if err != nil {
		panic(err)
	}
	tsclient := redists.NewClient(goredisDoer{client})

	v := client.Exists(context.Background(), "example:goredis").Val()
	if v == 0 {
		err = tsclient.Create(context.Background(), "example:goredis")
		if err != nil {
			panic(err)
		}
	}
	_, err = tsclient.Add(context.Background(), redists.NewSample("example:goredis", redists.TSAuto(), 0.5))
	if err != nil {
		panic(err)
	}

}

Creating a client with Redigo

The following example shows how to create a Doer implementation with Redigo:

package main

import (
	"context"
	redigo "github.com/gomodule/redigo/redis"
	"github.com/coding-socks/redists"
	"time"
)

type redigoDoer struct {
	c redigo.Conn
}

func (f redigoDoer) Do(ctx context.Context, cmd string, args ...interface{}) (interface{}, error) {
	deadline, ok := ctx.Deadline()
	if ok {
		return redigo.DoWithTimeout(f.c, time.Now().Sub(deadline), cmd, args...)
	}
	return f.c.Do(cmd, args...)
}

func main() {
	conn, err := redigo.Dial("tcp", "localhost:6379")
	if err != nil {
		panic(err)
	}
	defer conn.Close()
	tsclient := redists.NewClient(redigoDoer{conn})

	v, _ := redigo.Int(conn.Do("EXISTS", "example:redigo"))
	if v == 0 {
		err = tsclient.Create(context.Background(), "example:redigo")
		if err != nil {
			panic(err)
		}
	}
	_, err = tsclient.Add(context.Background(), redists.NewSample("example:redigo", redists.TSAuto(), 0.5))
	if err != nil {
		panic(err)
	}
}

Creating a client with RedisPipe

The following example shows how to create a Doer implementation with RedisPipe:

package main

import (
	"context"
	redispipe "github.com/joomcode/redispipe/redis"
	"github.com/joomcode/redispipe/redisconn"
	"github.com/coding-socks/redists"
	"time"
)

type redispipeDoer struct {
	s redispipe.Sender
}

func (f redispipeDoer) Do(ctx context.Context, cmd string, args ...interface{}) (interface{}, error) {
	res := redispipe.SyncCtx{f.s}.Do(ctx, cmd, args...)
	if err := redispipe.AsError(res); err != nil {
		return nil, err
	}
	return res, nil
}

func main() {
	sender, err := redisconn.Connect(context.Background(), "localhost:6379", redisconn.Opts{})
	defer sender.Close()
	if err != nil {
		panic(err)
	}
	tsclient := redists.NewClient(redispipeDoer{sender})

	sync := redispipe.SyncCtx{sender}
	res := sync.Do(context.Background(), "EXISTS", "example:redispipe")
	if res.(int64) == 0 {
		err = tsclient.Create(context.Background(), "example:redispipe")
		if err != nil {
			panic(err)
		}
	}
	_, err = tsclient.Add(context.Background(), redists.NewSample("example:redispipe", redists.TSAuto(), 0.5))
	if err != nil {
		panic(err)
	}
}

Creating a client with Radix

The following example shows how to create a Doer implementation with Radix:

package main

import (
	"context"
	"github.com/mediocregopher/radix/v4"
	"github.com/coding-socks/redists"
	"time"
)

type radixDoer struct {
	c radix.Client
}

func (f radixDoer) Do(ctx context.Context, cmd string, args ...interface{}) (interface{}, error) {
	var val interface{}
	err := f.c.Do(ctx, radix.FlatCmd(&val, cmd, args...))
	if e, ok := val.(error); err == nil && ok {
		return nil, e
	}
	return val, err
}

func main() {
	d := radix.Dialer{
		NewRespOpts: func() *resp.Opts {
			opts := resp.NewOpts()
			opts.DisableErrorBubbling = true
			return opts
		},
	}
	client, err := (radix.PoolConfig{Dialer: d}).New(context.Background(), "tcp", "localhost:6379")
	if err != nil {
		panic(err)
	}
	defer client.Close()
	tsclient := redists.NewClient(radixDoer{client})

	var v int
	client.Do(context.Background(), radix.Cmd(&v, "EXISTS", "example:radix"))
	if v == 0 {
		err = tsclient.Create(context.Background(), "example:radix")
		if err != nil {
			panic(err)
		}
	}
	_, err = tsclient.Add(context.Background(), redists.NewSample("example:radix", redists.TSAuto(), 0.5))
	if err != nil {
		panic(err)
	}
}

Index

Constants

View Source
const (
	// EncodingCompressed applies the DoubleDelta compression to the series samples.
	EncodingCompressed = Encoding("COMPRESSED")
	// EncodingUncompressed keeps the raw samples in memory.
	EncodingUncompressed = Encoding("UNCOMPRESSED")
)
View Source
const (
	// DuplicatePolicyBlock raises an error for any out of order sample.
	DuplicatePolicyBlock = DuplicatePolicy("BLOCK")
	// DuplicatePolicyFirst ignores the new value.
	DuplicatePolicyFirst = DuplicatePolicy("FIRST")
	// DuplicatePolicyLast overrides with the latest value.
	DuplicatePolicyLast = DuplicatePolicy("LAST")
	// DuplicatePolicyMin only overrides if the value is lower than the existing value.
	DuplicatePolicyMin = DuplicatePolicy("MIN")
	// DuplicatePolicyMax only overrides if the value is higher than the existing value.
	DuplicatePolicyMax = DuplicatePolicy("MAX")
	// DuplicatePolicySum in case a previous sample exists, adds the new sample to it so that the updated value is equal to (previous + new). If no previous sample exists, set the updated value equal to the new value.
	DuplicatePolicySum = DuplicatePolicy("SUM")
)
View Source
const (
	ReducerSum = ReducerType("SUM")
	ReducerMin = ReducerType("MIN")
	ReducerMax = ReducerType("MAX")
)
View Source
const (
	AggregationTypeAvg   = AggregationType("AVG")
	AggregationTypeSum   = AggregationType("SUM")
	AggregationTypeMin   = AggregationType("MIN")
	AggregationTypeMax   = AggregationType("MAX")
	AggregationTypeRange = AggregationType("RANGE")
	AggregationTypeCount = AggregationType("COUNT")
	AggregationTypeFirst = AggregationType("FIRST")
	AggregationTypeLast  = AggregationType("LAST")
	AggregationTypeStdP  = AggregationType("STD.P")
	AggregationTypeStdS  = AggregationType("STD.S")
	AggregationTypeVarP  = AggregationType("VAR.P")
	AggregationTypeVarS  = AggregationType("VAR.S")
)

Variables

This section is empty.

Functions

This section is empty.

Types

type Aggregation

type Aggregation struct {
	Type       AggregationType
	TimeBucket Duration
}

type AggregationType

type AggregationType string

type ChunkInfo

type ChunkInfo struct {
	StartTimestamp time.Time
	EndTimestamp   time.Time
	Samples        int64
	Size           int64
	BytesPerSample float64
}

type Client

type Client struct {
	// contains filtered or unexported fields
}

func NewClient

func NewClient(d Doer) *Client

func (*Client) Add

func (c *Client) Add(ctx context.Context, s Sample, options ...OptionAdd) (time.Time, error)

Add updates the retention, labels of an existing key.

func (*Client) Alter

func (c *Client) Alter(ctx context.Context, key string, options ...OptionAlter) error

Alter updates the retention, labels of an existing key.

func (*Client) Create

func (c *Client) Create(ctx context.Context, key string, options ...OptionCreate) error

Create creates a new time-series.

func (*Client) CreateRule

func (c *Client) CreateRule(ctx context.Context, srcKey, destKey string, a AggregationType, timeBucket Duration) error

CreateRule creates a compaction rule.

func (*Client) DecrBy

func (c *Client) DecrBy(ctx context.Context, key string, value float64, options ...OptionCounter) (time.Time, error)

DecrBy creates a new sample that decrements the latest sample's value.

func (*Client) Del

func (c *Client) Del(ctx context.Context, key string, from time.Time, to time.Time) (int64, error)

Del deletes samples between two timestamps for a given key.

func (*Client) DeleteRule

func (c *Client) DeleteRule(ctx context.Context, srcKey, destKey string) error

DeleteRule deletes a compaction rule.

func (*Client) Get

func (c *Client) Get(ctx context.Context, key string) (*DataPoint, error)

Get gets the last sample.

func (*Client) IncrBy

func (c *Client) IncrBy(ctx context.Context, key string, value float64, options ...OptionCounter) (time.Time, error)

IncrBy creates a new sample that increments the latest sample's value.

func (*Client) Info

func (c *Client) Info(ctx context.Context, key string, options ...OptionInfo) (Info, error)

Info returns information and statistics on the time-series.

func (*Client) MAdd

func (c *Client) MAdd(ctx context.Context, s []Sample) ([]MultiResult, error)

MAdd appends new samples to a list of series.

func (*Client) MGet

func (c *Client) MGet(ctx context.Context, filters []Filter, options ...OptionMGet) ([]LastDatapoint, error)

MGet gets the last samples matching the specific filter.

func (*Client) MRange

func (c *Client) MRange(ctx context.Context, from Timestamp, to Timestamp, filters []Filter, options ...OptionMRanger) ([]TimeSeries, error)

MRange queries a range across multiple time-series by filters in forward direction.

func (*Client) MRevRange

func (c *Client) MRevRange(ctx context.Context, from Timestamp, to Timestamp, filters []Filter, options ...OptionMRanger) ([]TimeSeries, error)

MRevRange queries a range across multiple time-series by filters in reverse direction.

func (*Client) QueryIndex

func (c *Client) QueryIndex(ctx context.Context, filters []Filter) ([]string, error)

QueryIndex lists all the keys matching the filter list.

func (*Client) Range

func (c *Client) Range(ctx context.Context, key string, from Timestamp, to Timestamp, options ...OptionRanger) ([]DataPoint, error)

Range queries a range in forward direction.

func (*Client) RevRange

func (c *Client) RevRange(ctx context.Context, key string, from Timestamp, to Timestamp, options ...OptionRanger) ([]DataPoint, error)

RevRange queries a range in reverse direction.

type DataPoint

type DataPoint struct {
	Timestamp time.Time
	Value     float64
}

type Doer

type Doer interface {
	Do(ctx context.Context, cmd string, args ...interface{}) (interface{}, error)
}

type DuplicatePolicy

type DuplicatePolicy string

type Duration added in v0.5.1

type Duration interface {
	Milliseconds() int64
}

type Encoding

type Encoding string

type Filter

type Filter struct {
	Label  string
	Equal  bool
	Values []string
}

func FilterEqual

func FilterEqual(label string, values ...string) Filter

func FilterNotEqual

func FilterNotEqual(label string, values ...string) Filter

func (Filter) Arg

func (f Filter) Arg() interface{}

type GroupBy

type GroupBy struct {
	Label   string
	Reducer ReducerType
}

type Info

type Info struct {
	TotalSamples    int64
	MemoryUsage     int64
	FirstTimestamp  time.Time
	LastTimestamp   time.Time
	RetentionTime   time.Duration
	ChunkCount      int64
	ChunkSize       int64
	ChunkType       Encoding
	DuplicatePolicy *DuplicatePolicy
	Labels          Labels
	SourceKey       string
	Rules           Rules
	Chunks          []ChunkInfo
}

type Labels

type Labels map[string]string

type LastDatapoint

type LastDatapoint struct {
	Key       string
	Labels    Labels
	DataPoint *DataPoint
}

type MultiResult

type MultiResult struct {
	// contains filtered or unexported fields
}

MultiResult contains an error when a specific Sample triggers an error.

func (MultiResult) Err

func (r MultiResult) Err() error

func (MultiResult) Time

func (r MultiResult) Time() time.Time

type OptionAdd

type OptionAdd func(cmd *cmdAdd)

func AddWithChunkSize

func AddWithChunkSize(cs int) OptionAdd

func AddWithEncoding

func AddWithEncoding(e Encoding) OptionAdd

func AddWithLabels

func AddWithLabels(ls Labels) OptionAdd

func AddWithOnDuplicate

func AddWithOnDuplicate(dp DuplicatePolicy) OptionAdd

func AddWithRetention

func AddWithRetention(r Duration) OptionAdd

type OptionAlter

type OptionAlter func(cmd *cmdAlter)

func AlterWithChunkSize added in v0.5.1

func AlterWithChunkSize(cs int) OptionAlter

func AlterWithDuplicatePolicy added in v0.5.1

func AlterWithDuplicatePolicy(dp DuplicatePolicy) OptionAlter

func AlterWithLabels

func AlterWithLabels(ls Labels) OptionAlter

func AlterWithRetention

func AlterWithRetention(r Duration) OptionAlter

type OptionCounter

type OptionCounter func(cmd *cmdCounter)

func CounterWithChunkSize

func CounterWithChunkSize(cs int) OptionCounter

func CounterWithEncoding

func CounterWithEncoding(e Encoding) OptionCounter

func CounterWithLabels

func CounterWithLabels(ls Labels) OptionCounter

func CounterWithRetention

func CounterWithRetention(r Duration) OptionCounter

func CounterWithTimestamp

func CounterWithTimestamp(t time.Time) OptionCounter

type OptionCreate

type OptionCreate func(cmd *cmdCreate)

func CreateWithChunkSize

func CreateWithChunkSize(cs int) OptionCreate

func CreateWithDuplicatePolicy

func CreateWithDuplicatePolicy(dp DuplicatePolicy) OptionCreate

func CreateWithEncoding

func CreateWithEncoding(e Encoding) OptionCreate

func CreateWithLabels

func CreateWithLabels(ls Labels) OptionCreate

func CreateWithRetention

func CreateWithRetention(r Duration) OptionCreate

type OptionInfo

type OptionInfo func(cmd *cmdInfo)

func InfoWithDebug

func InfoWithDebug() OptionInfo

type OptionMGet

type OptionMGet func(cmd *cmdMGet)

func MGetWithLabels

func MGetWithLabels(labels ...string) OptionMGet

type OptionMRanger

type OptionMRanger func(cmd *cmdMRanger)

func MRangerWithAggregation

func MRangerWithAggregation(t AggregationType, timeBucket Duration) OptionMRanger

func MRangerWithAlign

func MRangerWithAlign(a Timestamp) OptionMRanger

func MRangerWithCount

func MRangerWithCount(c int64) OptionMRanger

func MRangerWithGroupBy

func MRangerWithGroupBy(label string, reducer ReducerType) OptionMRanger

func MRangerWithLabels

func MRangerWithLabels(labels ...string) OptionMRanger

func MRangerWithTSFilter

func MRangerWithTSFilter(tss ...time.Time) OptionMRanger

func MRangerWithValueFilter

func MRangerWithValueFilter(min float64, max float64) OptionMRanger

type OptionRanger

type OptionRanger func(cmd *cmdRanger)

func RangerWithAggregation

func RangerWithAggregation(t AggregationType, timeBucket Duration) OptionRanger

func RangerWithAlign

func RangerWithAlign(a Timestamp) OptionRanger

func RangerWithCount

func RangerWithCount(c int64) OptionRanger

func RangerWithTSFilter

func RangerWithTSFilter(tss ...time.Time) OptionRanger

func RangerWithValueFilter

func RangerWithValueFilter(min float64, max float64) OptionRanger

type ReducerType

type ReducerType string

type Rules

type Rules map[string]Aggregation

type Sample

type Sample struct {
	Key       string
	Timestamp Timestamp
	Value     float64
}

func NewSample

func NewSample(key string, timestamp Timestamp, value float64) Sample

type TS

type TS struct {
	time.Time
	// contains filtered or unexported fields
}

TS can represent time.Time, `-`, `+`, and `*`.

func TSAuto

func TSAuto() TS

TSAuto returns a TS which represents `*`.

func TSMax

func TSMax() TS

TSMax returns a TS which represents `+`.

func TSMin

func TSMin() TS

TSMin returns a TS which represents `-`.

func (TS) Auto

func (t TS) Auto() bool

func (TS) Max

func (t TS) Max() bool

func (TS) Min

func (t TS) Min() bool

type TimeSeries

type TimeSeries struct {
	Key        string
	Labels     Labels
	DataPoints []DataPoint
}

type Timestamp

type Timestamp interface {
	UnixMilli() int64
}

type TimestampAuto

type TimestampAuto interface {
	Timestamp
	Auto() bool
}

type TimestampMax

type TimestampMax interface {
	Timestamp
	Max() bool
}

type TimestampMin

type TimestampMin interface {
	Timestamp
	Min() bool
}

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL