Documentation
¶
Overview ¶
Package redists is a typesafe Go client for RedisTimeSeries. It tries to support multiple Redis clients, because applications probably already use one.
Creating a client with GoRedis ¶
The following example shows how to create a Doer implementation with GoRedis:
package main
import (
"context"
goredis "github.com/go-redis/redis/v8"
"github.com/nerg4l/redists"
"time"
)
type goredisDoer struct {
c *goredis.Client
}
func (f goredisDoer) Do(ctx context.Context, cmd string, args ...interface{}) (interface{}, error) {
args = append([]interface{}{cmd}, args...)
return f.c.Do(ctx, args...).Result()
}
func main() {
client := goredis.NewClient(&goredis.Options{
Addr: "localhost:6379", // use default Addr
Password: "", // no password set
DB: 0, // use default DB
})
defer client.Close()
err := client.Ping(context.Background()).Err()
if err != nil {
panic(err)
}
tsclient := redists.NewClient(goredisDoer{client})
v := client.Exists(context.Background(), "example:goredis").Val()
if v == 0 {
err = tsclient.Create(context.Background(), "example:goredis")
if err != nil {
panic(err)
}
}
_, err = tsclient.Add(context.Background(), redists.NewSample("example:goredis", redists.TSAuto(), 0.5))
if err != nil {
panic(err)
}
}
Creating a client with Redigo ¶
The following example shows how to create a Doer implementation with Redigo:
package main
import (
"context"
redigo "github.com/gomodule/redigo/redis"
"github.com/nerg4l/redists"
"time"
)
type redigoDoer struct {
c redigo.Conn
}
func (f redigoDoer) Do(ctx context.Context, cmd string, args ...interface{}) (interface{}, error) {
deadline, ok := ctx.Deadline()
if ok {
return redigo.DoWithTimeout(f.c, time.Now().Sub(deadline), cmd, args...)
}
return f.c.Do(cmd, args...)
}
func main() {
conn, err := redigo.Dial("tcp", "localhost:6379")
if err != nil {
panic(err)
}
defer conn.Close()
tsclient := redists.NewClient(redigoDoer{conn})
v, _ := redigo.Int(conn.Do("EXISTS", "example:redigo"))
if v == 0 {
err = tsclient.Create(context.Background(), "example:redigo")
if err != nil {
panic(err)
}
}
_, err = tsclient.Add(context.Background(), redists.NewSample("example:redigo", redists.TSAuto(), 0.5))
if err != nil {
panic(err)
}
}
Creating a client with RedisPipe ¶
The following example shows how to create a Doer implementation with RedisPipe:
package main
import (
"context"
redispipe "github.com/joomcode/redispipe/redis"
"github.com/joomcode/redispipe/redisconn"
"github.com/nerg4l/redists"
"time"
)
type redispipeDoer struct {
s redispipe.Sender
}
func (f redispipeDoer) Do(ctx context.Context, cmd string, args ...interface{}) (interface{}, error) {
res := redispipe.SyncCtx{f.s}.Do(ctx, cmd, args...)
if err := redispipe.AsError(res); err != nil {
return nil, err
}
return res, nil
}
func main() {
sender, err := redisconn.Connect(context.Background(), "localhost:6379", redisconn.Opts{})
defer sender.Close()
if err != nil {
panic(err)
}
tsclient := redists.NewClient(redispipeDoer{sender})
sync := redispipe.SyncCtx{sender}
res := sync.Do(context.Background(), "EXISTS", "example:redispipe")
if res.(int64) == 0 {
err = tsclient.Create(context.Background(), "example:redispipe")
if err != nil {
panic(err)
}
}
_, err = tsclient.Add(context.Background(), redists.NewSample("example:redispipe", redists.TSAuto(), 0.5))
if err != nil {
panic(err)
}
}
Creating a client with Radix ¶
**!IMPORTANT!** `MAdd` will not return a list of results in case of an error when used with `github.com/mediocregopher/radix/v4`. An issue was already opened: https://github.com/mediocregopher/radix/issues/305
The following example shows how to create a Doer implementation with Radix:
package main
import (
"context"
"github.com/mediocregopher/radix/v4"
"github.com/nerg4l/redists"
"time"
)
type radixDoer struct {
c radix.Client
}
func (f radixDoer) Do(ctx context.Context, cmd string, args ...interface{}) (interface{}, error) {
var val interface{}
err := f.c.Do(ctx, radix.FlatCmd(&val, cmd, args...))
return val, err
}
func main() {
client, err := (radix.PoolConfig{}).New(context.Background(), "tcp", "localhost:6379")
if err != nil {
panic(err)
}
defer client.Close()
tsclient := redists.NewClient(radixDoer{client})
var v int
client.Do(context.Background(), radix.Cmd(&v, "EXISTS", "example:radix"))
if v == 0 {
err = tsclient.Create(context.Background(), "example:radix")
if err != nil {
panic(err)
}
}
_, err = tsclient.Add(context.Background(), redists.NewSample("example:radix", redists.TSAuto(), 0.5))
if err != nil {
panic(err)
}
}
Index ¶
- Constants
- type Aggregation
- type AggregationType
- type ChunkInfo
- type Client
- func (c *Client) Add(ctx context.Context, s Sample, options ...OptionAdd) (time.Time, error)
- func (c *Client) Alter(ctx context.Context, key string, options ...OptionAlter) error
- func (c *Client) Create(ctx context.Context, key string, options ...OptionCreate) error
- func (c *Client) CreateRule(ctx context.Context, srcKey, destKey string, a AggregationType, ...) error
- func (c *Client) DecrBy(ctx context.Context, key string, value float64, options ...OptionCounter) (time.Time, error)
- func (c *Client) Del(ctx context.Context, key string, from time.Time, to time.Time) (int64, error)
- func (c *Client) DeleteRule(ctx context.Context, srcKey, destKey string) error
- func (c *Client) Get(ctx context.Context, key string) (*DataPoint, error)
- func (c *Client) IncrBy(ctx context.Context, key string, value float64, options ...OptionCounter) (time.Time, error)
- func (c *Client) Info(ctx context.Context, key string, options ...OptionInfo) (Info, error)
- func (c *Client) MAdd(ctx context.Context, s []Sample) ([]MultiResult, error)
- func (c *Client) MGet(ctx context.Context, filters []Filter, options ...OptionMGet) ([]LastDatapoint, error)
- func (c *Client) MRange(ctx context.Context, from Timestamp, to Timestamp, filters []Filter, ...) ([]TimeSeries, error)
- func (c *Client) MRevRange(ctx context.Context, from Timestamp, to Timestamp, filters []Filter, ...) ([]TimeSeries, error)
- func (c *Client) QueryIndex(ctx context.Context, filters []Filter) ([]string, error)
- func (c *Client) Range(ctx context.Context, key string, from Timestamp, to Timestamp, ...) ([]DataPoint, error)
- func (c *Client) RevRange(ctx context.Context, key string, from Timestamp, to Timestamp, ...) ([]DataPoint, error)
- type DataPoint
- type Doer
- type DuplicatePolicy
- type Encoding
- type Filter
- type GroupBy
- type Info
- type Labels
- type LastDatapoint
- type MultiResult
- type OptionAdd
- type OptionAlter
- type OptionCounter
- type OptionCreate
- type OptionInfo
- type OptionMGet
- type OptionMRanger
- func MRangerWithAggregation(t AggregationType, timeBucket time.Duration) OptionMRanger
- func MRangerWithAlign(a Timestamp) OptionMRanger
- func MRangerWithCount(c int64) OptionMRanger
- func MRangerWithGroupBy(label string, reducer ReducerType) OptionMRanger
- func MRangerWithLabels(labels ...string) OptionMRanger
- func MRangerWithTSFilter(tss ...time.Time) OptionMRanger
- func MRangerWithValueFilter(min float64, max float64) OptionMRanger
- type OptionRanger
- type ReducerType
- type Rule
- type Sample
- type TS
- type TimeSeries
- type Timestamp
- type TimestampAuto
- type TimestampMax
- type TimestampMin
Constants ¶
const ( // EncodingCompressed applies the DoubleDelta compression to the series samples. EncodingCompressed = Encoding("COMPRESSED") // EncodingUncompressed keeps the raw samples in memory. EncodingUncompressed = Encoding("UNCOMPRESSED") )
const ( // DuplicatePolicyBlock raises an error for any out of order sample. DuplicatePolicyBlock = DuplicatePolicy("BLOCK") // DuplicatePolicyFirst ignores the new value. DuplicatePolicyFirst = DuplicatePolicy("FIRST") // DuplicatePolicyLast overrides with the latest value. DuplicatePolicyLast = DuplicatePolicy("LAST") // DuplicatePolicyMin only overrides if the value is lower than the existing value. DuplicatePolicyMin = DuplicatePolicy("MIN") // DuplicatePolicyMax only overrides if the value is higher than the existing value. DuplicatePolicyMax = DuplicatePolicy("MAX") // DuplicatePolicySum in case a previous sample exists, adds the new sample to it so that the updated value is equal to (previous + new). If no previous sample exists, set the updated value equal to the new value. DuplicatePolicySum = DuplicatePolicy("SUM") )
const ( ReducerSum = ReducerType("SUM") ReducerMin = ReducerType("MIN") ReducerMax = ReducerType("MAX") )
const ( AggregationTypeAvg = AggregationType("AVG") AggregationTypeSum = AggregationType("SUM") AggregationTypeMin = AggregationType("MIN") AggregationTypeMax = AggregationType("MAX") AggregationTypeRange = AggregationType("RANGE") AggregationTypeCount = AggregationType("COUNT") AggregationTypeFirst = AggregationType("FIRST") AggregationTypeLast = AggregationType("LAST") AggregationTypeStdP = AggregationType("STD.P") AggregationTypeStdS = AggregationType("STD.S") AggregationTypeVarP = AggregationType("VAR.P") AggregationTypeVarS = AggregationType("VAR.S") )
Variables ¶
This section is empty.
Functions ¶
This section is empty.
Types ¶
type Aggregation ¶
type Aggregation struct {
Type AggregationType
TimeBucket time.Duration
}
type AggregationType ¶
type AggregationType string
type Client ¶
type Client struct {
// contains filtered or unexported fields
}
func (*Client) CreateRule ¶
func (c *Client) CreateRule(ctx context.Context, srcKey, destKey string, a AggregationType, timeBucket time.Duration) error
CreateRule creates a compaction rule.
func (*Client) DecrBy ¶
func (c *Client) DecrBy(ctx context.Context, key string, value float64, options ...OptionCounter) (time.Time, error)
DecrBy creates a new sample that decrements the latest sample's value.
func (*Client) DeleteRule ¶
DeleteRule deletes a compaction rule.
func (*Client) IncrBy ¶
func (c *Client) IncrBy(ctx context.Context, key string, value float64, options ...OptionCounter) (time.Time, error)
IncrBy creates a new sample that increments the latest sample's value.
func (*Client) MGet ¶
func (c *Client) MGet(ctx context.Context, filters []Filter, options ...OptionMGet) ([]LastDatapoint, error)
MGet gets the last samples matching the specific filter.
func (*Client) MRange ¶
func (c *Client) MRange(ctx context.Context, from Timestamp, to Timestamp, filters []Filter, options ...OptionMRanger) ([]TimeSeries, error)
MRange queries a range across multiple time-series by filters in forward direction.
func (*Client) MRevRange ¶
func (c *Client) MRevRange(ctx context.Context, from Timestamp, to Timestamp, filters []Filter, options ...OptionMRanger) ([]TimeSeries, error)
MRevRange queries a range across multiple time-series by filters in reverse direction.
func (*Client) QueryIndex ¶
QueryIndex lists all the keys matching the filter list.
type DuplicatePolicy ¶
type DuplicatePolicy string
type GroupBy ¶
type GroupBy struct {
Label string
Reducer ReducerType
}
type LastDatapoint ¶
type MultiResult ¶
type MultiResult struct {
// contains filtered or unexported fields
}
MultiResult contains an error when a specific Sample triggers an error.
func (MultiResult) Err ¶
func (r MultiResult) Err() error
func (MultiResult) Time ¶
func (r MultiResult) Time() time.Time
type OptionAdd ¶
type OptionAdd func(cmd *cmdAdd)
func AddWithChunkSize ¶
func AddWithEncoding ¶
func AddWithLabels ¶
func AddWithOnDuplicate ¶
func AddWithOnDuplicate(dp DuplicatePolicy) OptionAdd
func AddWithRetention ¶
type OptionAlter ¶
type OptionAlter func(cmd *cmdAlter)
func AlterWithLabels ¶
func AlterWithLabels(ls Labels) OptionAlter
func AlterWithRetention ¶
func AlterWithRetention(r time.Duration) OptionAlter
type OptionCounter ¶
type OptionCounter func(cmd *cmdCounter)
func CounterWithChunkSize ¶
func CounterWithChunkSize(cs int) OptionCounter
func CounterWithEncoding ¶
func CounterWithEncoding(e Encoding) OptionCounter
func CounterWithLabels ¶
func CounterWithLabels(ls Labels) OptionCounter
func CounterWithRetention ¶
func CounterWithRetention(r time.Duration) OptionCounter
func CounterWithTimestamp ¶
func CounterWithTimestamp(t time.Time) OptionCounter
type OptionCreate ¶
type OptionCreate func(cmd *cmdCreate)
func CreateWithChunkSize ¶
func CreateWithChunkSize(cs int) OptionCreate
func CreateWithDuplicatePolicy ¶
func CreateWithDuplicatePolicy(dp DuplicatePolicy) OptionCreate
func CreateWithEncoding ¶
func CreateWithEncoding(e Encoding) OptionCreate
func CreateWithLabels ¶
func CreateWithLabels(ls Labels) OptionCreate
func CreateWithRetention ¶
func CreateWithRetention(r time.Duration) OptionCreate
type OptionInfo ¶
type OptionInfo func(cmd *cmdInfo)
func InfoWithDebug ¶
func InfoWithDebug() OptionInfo
type OptionMGet ¶
type OptionMGet func(cmd *cmdMGet)
func MGetWithLabels ¶
func MGetWithLabels(labels ...string) OptionMGet
type OptionMRanger ¶
type OptionMRanger func(cmd *cmdMRanger)
func MRangerWithAggregation ¶
func MRangerWithAggregation(t AggregationType, timeBucket time.Duration) OptionMRanger
func MRangerWithAlign ¶
func MRangerWithAlign(a Timestamp) OptionMRanger
func MRangerWithCount ¶
func MRangerWithCount(c int64) OptionMRanger
func MRangerWithGroupBy ¶
func MRangerWithGroupBy(label string, reducer ReducerType) OptionMRanger
func MRangerWithLabels ¶
func MRangerWithLabels(labels ...string) OptionMRanger
func MRangerWithTSFilter ¶
func MRangerWithTSFilter(tss ...time.Time) OptionMRanger
func MRangerWithValueFilter ¶
func MRangerWithValueFilter(min float64, max float64) OptionMRanger
type OptionRanger ¶
type OptionRanger func(cmd *cmdRanger)
func RangerWithAggregation ¶
func RangerWithAggregation(t AggregationType, timeBucket time.Duration) OptionRanger
func RangerWithAlign ¶
func RangerWithAlign(a Timestamp) OptionRanger
func RangerWithCount ¶
func RangerWithCount(c int64) OptionRanger
func RangerWithTSFilter ¶
func RangerWithTSFilter(tss ...time.Time) OptionRanger
func RangerWithValueFilter ¶
func RangerWithValueFilter(min float64, max float64) OptionRanger
type ReducerType ¶
type ReducerType string
type Rule ¶
type Rule struct {
Key string
Aggregation Aggregation
}