om

package
v1.0.25 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Dec 22, 2023 License: Apache-2.0 Imports: 15 Imported by: 0

README

Generic Object Mapping

The om.NewHashRepository and om.NewJSONRepository creates an OM repository backed by redis hash or RedisJSON.

package main

import (
    "context"
    "fmt"
    "time"

    "github.com/Datadog/rueidis"
    "github.com/Datadog/rueidis/om"
)

type Example struct {
    Key  string    `json:"key" redis:",key"`   // the redis:",key" is required to indicate which field is the ULID key
    Ver  int64     `json:"ver" redis:",ver"`   // the redis:",ver" is required to do optimistic locking to prevent lost update
    ExAt time.Time `json:"exat" redis:",exat"` // the redis:",exat" is optional for setting record expiry with unix timestamp
    Str  string    `json:"str"`                // both NewHashRepository and NewJSONRepository use json tag as field name
}

func main() {
    ctx := context.Background()
    c, err := rueidis.NewClient(rueidis.ClientOption{InitAddress: []string{"127.0.0.1:6379"}})
    if err != nil {
        panic(err)
    }
    // create the repo with NewHashRepository or NewJSONRepository
    repo := om.NewHashRepository("my_prefix", Example{}, c)

    exp := repo.NewEntity()
    exp.Str = "mystr"
    exp.ExAt = time.Now().Add(time.Hour)
    fmt.Println(exp.Key) // output 01FNH4FCXV9JTB9WTVFAAKGSYB
    repo.Save(ctx, exp) // success

    // lookup "my_prefix:01FNH4FCXV9JTB9WTVFAAKGSYB" through client side caching
    exp2, _ := repo.FetchCache(ctx, exp.Key, time.Second*5)
    fmt.Println(exp2.Str) // output "mystr", which equals to exp.Str

    exp2.Ver = 0         // if someone changes the version during your GET then SET operation,
    repo.Save(ctx, exp2) // the save will fail with ErrVersionMismatch.
}

Object Mapping + RediSearch

If you have RediSearch, you can create and search the repository against the index.

if _, ok := repo.(*om.HashRepository[Example]); ok {
    repo.CreateIndex(ctx, func(schema om.FtCreateSchema) rueidis.Completed {
        return schema.FieldName("str").Tag().Build() // Note that the Example.Str field is mapped to str on redis by its json tag
    })
}

if _, ok := repo.(*om.JSONRepository[Example]); ok {
    repo.CreateIndex(ctx, func(schema om.FtCreateSchema) rueidis.Completed {
        return schema.FieldName("$.str").As("str").Tag().Build() // the FieldName of a json index should be a json path syntax
    })
}

exp := repo.NewEntity()
exp.Str = "special_chars:[$.-]"
repo.Save(ctx, exp)

n, records, _ := repo.Search(ctx, func(search om.FtSearchIndex) rueidis.Completed {
    // Note that by using the named parameters with DIALECT >= 2, you won't have to escape chars for building queries.
    return search.Query("@str:{$v}").Params().Nargs(2).NameValue().NameValue("v", exp.Str).Dialect(2).Build()
})

fmt.Println("total", n) // n is total number of results matched in redis, which is >= len(records)

for _, v := range records {
    fmt.Println(v.Str) // print "special_chars:[$.-]"
}
Change Search Index Name

The default index name for HashRepository and JSONRepository is hashidx:{prefix} and jsonidx:{prefix} respectively.

They can be changed by WithIndexName option to allow searching difference indexes:

repo1 := om.NewHashRepository("my_prefix", Example{}, c, om.WithIndexName("my_index1"))
repo2 := om.NewHashRepository("my_prefix", Example{}, c, om.WithIndexName("my_index2"))
Object Expiry Timestamp

Setting a redis:",exat" tag on a time.Time field will set PEXPIREAT on the record accordingly when calling .Save().

If the time.Time is zero, then the expiry will be untouched when calling .Save().

Object Mapping Limitation

NewHashRepository only accepts these field types:

  • string, *string
  • int64, *int64
  • bool, *bool
  • []byte, json.RawMessage
  • []float32, []float64 for vector search
  • json.Marshaler+json.Unmarshaler

Field projection by RediSearch is not supported.

Documentation

Index

Constants

This section is empty.

Variables

View Source
var (
	// ErrVersionMismatch indicates that the optimistic update failed. That is someone else had already changed the entity.
	ErrVersionMismatch = errors.New("object version mismatched, please retry")
	// ErrEmptyHashRecord indicates the requested hash entity is not found.
	ErrEmptyHashRecord = errors.New("hash object not found")
)
View Source
var EndOfCursor = errors.New("end of cursor")

Functions

func IsRecordNotFound

func IsRecordNotFound(err error) bool

IsRecordNotFound checks if the error is indicating the requested entity is not found.

Types

type AggregateCursor

type AggregateCursor struct {
	// contains filtered or unexported fields
}

AggregateCursor unifies the response of FT.AGGREGATE with or without WITHCURSOR

func (*AggregateCursor) Del

func (c *AggregateCursor) Del(ctx context.Context) (err error)

Del uses FT.CURSOR DEL to destroy the cursor

func (*AggregateCursor) Read

func (c *AggregateCursor) Read(ctx context.Context) (partial []map[string]string, err error)

Read return the partial result from the initial FT.AGGREGATE This may invoke FT.CURSOR READ to retrieve further result

func (*AggregateCursor) Total

func (c *AggregateCursor) Total() int64

Total return the total numbers of record of the initial FT.AGGREGATE result

type Arbitrary

type Arbitrary = cmds.Arbitrary

Arbitrary is alias to cmds.Arbitrary. This allows user build arbitrary command in Repository.CreateIndex

type FtAggregateIndex

type FtAggregateIndex = cmds.FtAggregateIndex

FtAggregateIndex is the FT.AGGREGATE command builder

type FtCreateSchema

type FtCreateSchema = cmds.FtCreateSchema

FtCreateSchema is the FT.CREATE command builder

type FtSearchIndex

type FtSearchIndex = cmds.FtSearchIndex

FtSearchIndex is the FT.SEARCH command builder

type HashRepository

type HashRepository[T any] struct {
	// contains filtered or unexported fields
}

HashRepository is an OM repository backed by redis hash.

func (*HashRepository[T]) Aggregate

func (r *HashRepository[T]) Aggregate(ctx context.Context, cmdFn func(agg FtAggregateIndex) rueidis.Completed) (cursor *AggregateCursor, err error)

Aggregate performs the FT.AGGREGATE and returns a *AggregateCursor for accessing the results

func (*HashRepository[T]) CreateIndex

func (r *HashRepository[T]) CreateIndex(ctx context.Context, cmdFn func(schema FtCreateSchema) rueidis.Completed) error

CreateIndex uses FT.CREATE from the RediSearch module to create inverted index under the name `hashidx:{prefix}` You can use the cmdFn parameter to mutate the index construction command.

func (*HashRepository[T]) DropIndex

func (r *HashRepository[T]) DropIndex(ctx context.Context) error

DropIndex uses FT.DROPINDEX from the RediSearch module to drop index whose name is `hashidx:{prefix}`

func (*HashRepository[T]) Fetch

func (r *HashRepository[T]) Fetch(ctx context.Context, id string) (v *T, err error)

Fetch an entity whose name is `{prefix}:{id}`

func (*HashRepository[T]) FetchCache

func (r *HashRepository[T]) FetchCache(ctx context.Context, id string, ttl time.Duration) (v *T, err error)

FetchCache is like Fetch, but it uses client side caching mechanism.

func (*HashRepository[T]) IndexName

func (r *HashRepository[T]) IndexName() string

IndexName returns the index name used in the FT.CREATE

func (*HashRepository[T]) NewEntity

func (r *HashRepository[T]) NewEntity() (entity *T)

NewEntity returns an empty entity and will have the `redis:",key"` field be set with ULID automatically.

func (*HashRepository[T]) Remove

func (r *HashRepository[T]) Remove(ctx context.Context, id string) error

Remove the entity under the redis key of `{prefix}:{id}`.

func (*HashRepository[T]) Save

func (r *HashRepository[T]) Save(ctx context.Context, entity *T) (err error)

Save the entity under the redis key of `{prefix}:{id}`. It also uses the `redis:",ver"` field and lua script to perform optimistic locking and prevent lost update.

func (*HashRepository[T]) SaveMulti

func (r *HashRepository[T]) SaveMulti(ctx context.Context, entities ...*T) []error

SaveMulti batches multiple HashRepository.Save at once

func (*HashRepository[T]) Search

func (r *HashRepository[T]) Search(ctx context.Context, cmdFn func(search FtSearchIndex) rueidis.Completed) (n int64, s []*T, err error)

Search uses FT.SEARCH from the RediSearch module to search the index whose name is `hashidx:{prefix}` It returns three values: 1. total count of match results inside the redis, and note that it might be larger than returned search result. 2. search result, and note that its length might smaller than the first return value. 3. error if any You can use the cmdFn parameter to mutate the search command.

type JSONRepository

type JSONRepository[T any] struct {
	// contains filtered or unexported fields
}

JSONRepository is an OM repository backed by RedisJSON.

func (*JSONRepository[T]) Aggregate

func (r *JSONRepository[T]) Aggregate(ctx context.Context, cmdFn func(agg FtAggregateIndex) rueidis.Completed) (cursor *AggregateCursor, err error)

Aggregate performs the FT.AGGREGATE and returns a *AggregateCursor for accessing the results

func (*JSONRepository[T]) CreateIndex

func (r *JSONRepository[T]) CreateIndex(ctx context.Context, cmdFn func(schema FtCreateSchema) rueidis.Completed) error

CreateIndex uses FT.CREATE from the RediSearch module to create inverted index under the name `jsonidx:{prefix}` You can use the cmdFn parameter to mutate the index construction command, and note that the field name should be specified with JSON path syntax, otherwise the index may not work as expected.

func (*JSONRepository[T]) DropIndex

func (r *JSONRepository[T]) DropIndex(ctx context.Context) error

DropIndex uses FT.DROPINDEX from the RediSearch module to drop index whose name is `jsonidx:{prefix}`

func (*JSONRepository[T]) Fetch

func (r *JSONRepository[T]) Fetch(ctx context.Context, id string) (v *T, err error)

Fetch an entity whose name is `{prefix}:{id}`

func (*JSONRepository[T]) FetchCache

func (r *JSONRepository[T]) FetchCache(ctx context.Context, id string, ttl time.Duration) (v *T, err error)

FetchCache is like Fetch, but it uses client side caching mechanism.

func (*JSONRepository[T]) IndexName

func (r *JSONRepository[T]) IndexName() string

IndexName returns the index name used in the FT.CREATE

func (*JSONRepository[T]) NewEntity

func (r *JSONRepository[T]) NewEntity() *T

NewEntity returns an empty entity and will have the `redis:",key"` field be set with ULID automatically.

func (*JSONRepository[T]) Remove

func (r *JSONRepository[T]) Remove(ctx context.Context, id string) error

Remove the entity under the redis key of `{prefix}:{id}`.

func (*JSONRepository[T]) Save

func (r *JSONRepository[T]) Save(ctx context.Context, entity *T) (err error)

Save the entity under the redis key of `{prefix}:{id}`. It also uses the `redis:",ver"` field and lua script to perform optimistic locking and prevent lost update.

func (*JSONRepository[T]) SaveMulti

func (r *JSONRepository[T]) SaveMulti(ctx context.Context, entities ...*T) []error

SaveMulti batches multiple HashRepository.Save at once

func (*JSONRepository[T]) Search

func (r *JSONRepository[T]) Search(ctx context.Context, cmdFn func(search FtSearchIndex) rueidis.Completed) (n int64, s []*T, err error)

Search uses FT.SEARCH from the RediSearch module to search the index whose name is `jsonidx:{prefix}` It returns three values: 1. total count of match results inside the redis, and note that it might be larger than returned search result. 2. search result, and note that its length might smaller than the first return value. 3. error if any You can use the cmdFn parameter to mutate the search command.

type Repository

type Repository[T any] interface {
	NewEntity() (entity *T)
	Fetch(ctx context.Context, id string) (*T, error)
	FetchCache(ctx context.Context, id string, ttl time.Duration) (v *T, err error)
	Search(ctx context.Context, cmdFn func(search FtSearchIndex) rueidis.Completed) (int64, []*T, error)
	Aggregate(ctx context.Context, cmdFn func(agg FtAggregateIndex) rueidis.Completed) (*AggregateCursor, error)
	Save(ctx context.Context, entity *T) (err error)
	SaveMulti(ctx context.Context, entity ...*T) (errs []error)
	Remove(ctx context.Context, id string) error
	CreateIndex(ctx context.Context, cmdFn func(schema FtCreateSchema) rueidis.Completed) error
	DropIndex(ctx context.Context) error
	IndexName() string
}

Repository is backed by HashRepository or JSONRepository

func NewHashRepository

func NewHashRepository[T any](prefix string, schema T, client rueidis.Client, opts ...RepositoryOption) Repository[T]

NewHashRepository creates an HashRepository. The prefix parameter is used as redis key prefix. The entity stored by the repository will be named in the form of `{prefix}:{id}` The schema parameter should be a struct with fields tagged with `redis:",key"` and `redis:",ver"`

func NewJSONRepository

func NewJSONRepository[T any](prefix string, schema T, client rueidis.Client, opts ...RepositoryOption) Repository[T]

NewJSONRepository creates an JSONRepository. The prefix parameter is used as redis key prefix. The entity stored by the repository will be named in the form of `{prefix}:{id}` The schema parameter should be a struct with fields tagged with `redis:",key"` and `redis:",ver"`

type RepositoryOption

type RepositoryOption func(Repository[any])

func WithIndexName

func WithIndexName(name string) RepositoryOption

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL