Documentation ¶
Index ¶
Constants ¶
View Source
const ( DefaultGroup = "_tigris_group" DefaultConsumer = "_tigris_consumer" )
View Source
const (
// ConsumerGroupDefaultCurrentPos is for creating a consumer group that sets the position as current.
ConsumerGroupDefaultCurrentPos = "$"
)
Variables ¶
View Source
var ( // ErrStreamAlreadyExists is returned when a stream already exists. ErrStreamAlreadyExists = NewCacheError(ErrCodeStreamExists, "stream already exists") // ErrStreamNotFound is returned when a stream does not exist. ErrStreamNotFound = NewCacheError(ErrCodeStreamNotFound, "stream not found") ErrKeyNotFound = NewCacheError(ErrCodeKeyNotFound, "key not found") ErrKeyAlreadyExists = NewCacheError(ErrCodeKeyAlreadyExists, "key already exists") ErrEmptyKey = NewCacheError(ErrCodeEmptyKey, "key is empty") )
View Source
var BlockReadGroupDuration = 180 * time.Second
Functions ¶
func IsStreamAlreadyExists ¶
func NewCacheError ¶
Types ¶
type Cache ¶
type Cache interface { Set(ctx context.Context, tableName string, key string, value *internal.CacheData, options *SetOptions) error // GetSet is to get the previous value and set the new value GetSet(ctx context.Context, tableName string, key string, value *internal.CacheData) (*internal.CacheData, error) // Get the value of key Get(ctx context.Context, tableName string, key string, options *GetOptions) (*internal.CacheData, error) // Delete deletes one or more keys Delete(ctx context.Context, tableName string, keys ...string) (int64, error) // Exists returns if the key exists, for multiple keys it returns the count of the number of keys that exists Exists(ctx context.Context, tableName string, key ...string) (int64, error) Keys(ctx context.Context, tableName string, pattern string) ([]string, error) Scan(ctx context.Context, tableName string, cursor uint64, count int64, pattern string) ([]string, uint64) // CreateStream creates and returns a stream object, throws an error if stream already exists CreateStream(ctx context.Context, streamName string) (Stream, error) // CreateOrGetStream creates or returns an existing stream CreateOrGetStream(ctx context.Context, streamName string) (Stream, error) // GetStream only returns a stream if the stream exists in the Cache GetStream(ctx context.Context, streamName string) (Stream, error) // ListStreams returns the all the streams with the prefix ListStreams(ctx context.Context, streamNamePrefix string) ([]string, error) // DeleteStream to delete a stream if exists DeleteStream(ctx context.Context, streamName string) error }
func NewCache ¶
func NewCache(cfg *config.CacheConfig) Cache
type GetOptions ¶
type ReadGroupPos ¶
type ReadGroupPos string
const ( // ReadGroupPosStart is to let steam know that reads needs to happen since beginning. ReadGroupPosStart ReadGroupPos = "0-0" // ReadGroupPosCurrent is to let stream know that reads needs to happen from current. ReadGroupPosCurrent ReadGroupPos = ">" )
type SetOptions ¶
type SetOptions struct { // NX is SetIfNotExists i.e. only set the key if it does not already exist. NX bool // XX is SetIfExists i.e. only set the key if it already exists. XX bool // EX sets the Expiry time of the key in second EX uint64 // PX sets the Expiry time of the key in millisecond PX uint64 }
type Stream ¶
type Stream interface { Name() string // Add is to add streamData to a stream Add(ctx context.Context, value *internal.StreamData) (string, error) // Read data from the stream, returns data ID greater than position. To read from current use "$" Read(ctx context.Context, pos string) (*StreamMessages, bool, error) // ReadGroup is similar to Read but with support for reading from a group. We don't have multiple consumers in a // single group. Currently, it creates an internal _tigris_consumer. ReadGroup(ctx context.Context, group string, pos ReadGroupPos) (*StreamMessages, bool, error) // CreateConsumerGroup creates a consumer group and attach it to the stream. The pos is used to specify the position // for this consumer group. CreateConsumerGroup(ctx context.Context, group string, pos string) error // RemoveConsumerGroup removes consumer group from this stream. RemoveConsumerGroup(ctx context.Context, group string) error // GetConsumerGroups returns all the consumer groups attached to this stream GetConsumerGroups(ctx context.Context) ([]xredis.XInfoGroup, error) // GetConsumerGroup returns only information about the consumer group passed in the API. GetConsumerGroup(ctx context.Context, group string) (*xredis.XInfoGroup, bool, error) // SetID is used to set the position of the group again. SetID(ctx context.Context, group string, pos string) error // Ack is to acknowledge messages once they are read by the consumer group. This is required to be called in case // ReadGroup is used so that messages doesn't end up in pending entries list. Ack(ctx context.Context, group string, ids ...string) error // Delete is to delete this stream. it removes all the associated consumer group as well. Delete(ctx context.Context) error }
type StreamMessages ¶
func (*StreamMessages) Decode ¶
func (sm *StreamMessages) Decode(message xredis.XMessage) (*internal.StreamData, error)
Click to show internal directories.
Click to hide internal directories.