Documentation ¶
Overview ¶
GTRS (Go Typed Redis Streams) is a library for easily reading Redis streams with or without consumer groups. See https://github.com/dranikpg/gtrs for a quick tutorial.
Index ¶
- Variables
- type AckError
- type AsciiTime
- type Consumer
- type ConvertibleFrom
- type ConvertibleTo
- type FieldParseError
- type GroupConsumer
- type GroupConsumerConfig
- type InnerAck
- type Message
- type Metadata
- type Options
- type ParseError
- type ReadError
- type SerializeError
- type Stream
- func (s Stream[T]) Add(ctx context.Context, v T, idarg ...string) (string, error)
- func (s Stream[T]) Key() string
- func (s Stream[T]) Len(ctx context.Context) (int64, error)
- func (s Stream[T]) Range(ctx context.Context, from, to string, count ...int64) ([]Message[T], error)
- func (s Stream[T]) RevRange(ctx context.Context, from, to string, count ...int64) ([]Message[T], error)
- type StreamConsumer
- type StreamConsumerConfig
- type StreamIDs
Constants ¶
This section is empty.
Variables ¶
var ErrAckBadRetVal = errors.New("XAck made no acknowledgement")
ErrAckBadRetVal is caused by XACK not accepting an request by returning 0. This usually indicates that the id is wrong or the stream has no groups.
var NoExpiration = time.Duration(0)
var NoMaxLen = int64(0)
Functions ¶
This section is empty.
Types ¶
type AckError ¶
type AckError struct {
Err error
}
AckError indicates that an acknowledgement request failed.
type AsciiTime ¶ added in v0.6.0
type AsciiTime = gtrsconvert.AsciiTime
AsciiTime is a type that wraps time.Time, however the (Un)[M/m]arshalbinary functions are overridden to marshal in the same format as Text
type ConvertibleFrom ¶
type ConvertibleFrom = gtrsconvert.ConvertibleFrom
ConvertibleFrom is implemented by types that can load themselves from a map.
type ConvertibleTo ¶
type ConvertibleTo = gtrsconvert.ConvertibleTo
ConvertibleTo is implemented by types that can convert themselves to a map.
type FieldParseError ¶
type FieldParseError = gtrsconvert.FieldParseError
FieldParseError is returned with a field fails to be parsed
type GroupConsumer ¶
GroupConsumer is a consumer that reads from a consumer group and similar to a StreamConsumer. Message acknowledgements can be sent asynchronously via Ack(). The consumer has to be closed to release resources and stop goroutines.
func NewGroupConsumer ¶
func NewGroupConsumer[T any](ctx context.Context, rdb redis.Cmdable, group, name, stream, lastID string, cfgs ...GroupConsumerConfig) *GroupConsumer[T]
NewGroupConsumer creates a new GroupConsumer with optional configuration. Only the first configuration element is use.
func NewGroupMultiStreamConsumer ¶ added in v0.6.0
func NewGroupMultiStreamConsumer[T any](ctx context.Context, rdb redis.Cmdable, group, name string, seenIds StreamIDs, cfgs ...GroupConsumerConfig) *GroupConsumer[T]
NewGroupMultiStreamConsumer creates a new GroupConsumer with optional configuration. Only the first configuration element is use.
func (*GroupConsumer[T]) Ack ¶
func (gc *GroupConsumer[T]) Ack(msg Message[T])
Ack requests an asynchronous XAck acknowledgement request for the passed message.
NOTE: Ack sometimes provides backpressure, so it should be only used inside the consumer loop or with another goroutine handling errors from the consumer channel. Otherwise it may deadlock.
func (*GroupConsumer[T]) AwaitAcks ¶
func (gc *GroupConsumer[T]) AwaitAcks() []Message[T]
AwaitAcks blocks until all so far requested ack requests are processed and returns a slice of Messages with AckErrors that happened during wait.
func (*GroupConsumer[T]) Chan ¶
func (gc *GroupConsumer[T]) Chan() <-chan Message[T]
Chan returns the main channel with new messages.
This channel is closed when: - the consumer is closed - immediately on context cancel - in case of a ReadError - in case Ack fail, AckError is return
func (*GroupConsumer[T]) Close ¶
func (gc *GroupConsumer[T]) Close() []InnerAck
Close closes the consumer (if not already closed) and returns a slice of unprocessed ack requests. An ack request in unprocessed if it wasn't sent or its error wasn't consumed.
type GroupConsumerConfig ¶
type GroupConsumerConfig struct { StreamConsumerConfig AckBufferSize uint }
GroupConsumerConfig provides basic configuration for GroupConsumer.
type Metadata ¶ added in v0.3.0
type Metadata = gtrsconvert.Metadata
Metadata is a type that allows serialization of generic structured metadata within the stream entries. Any value that can be serialized to JSON can be inserted here.
type Options ¶ added in v0.3.0
type Options struct { // TTL is an optional parameter to specify how long entries stay in the stream before expiring, // it only only works as expected when a non-custom id is used to Add a message. // The default is No Expiration. // Note that TTL is performed when messages are Added, so Range requests won't clean up old messages. TTL time.Duration // MaxLen is an optional parameter to specify the maximum length of the stream. MaxLen int64 // Approx causes MaxLen and TTL to be approximate instead of exact. Approx bool }
type ParseError ¶
type ParseError struct { Data map[string]interface{} // raw data returned by the redis client Err error }
ParseError indicates an error during parsing.
func (ParseError) Error ¶
func (pe ParseError) Error() string
func (ParseError) Unwrap ¶
func (pe ParseError) Unwrap() error
type ReadError ¶
type ReadError struct {
Err error
}
ReadError indicates an erorr with the redis client.
After a ReadError was returned from a consumer, it'll close its main channel.
type SerializeError ¶ added in v0.3.0
type SerializeError = gtrsconvert.SerializeError
SerializeError is returned with a field fails to be serialized
type Stream ¶
type Stream[T any] struct { // contains filtered or unexported fields }
Stream represents a redis stream with messages of type T.
func NewStream ¶
NewStream create a new stream with messages of type T. Options are optional (the parameter can be nil to use defaults).
type StreamConsumer ¶
StreamConsumer is a consumer that reads from one or multiple redis streams. The consumer has to be closed to release resources and stop goroutines.
func NewConsumer ¶
func NewConsumer[T any](ctx context.Context, rdb redis.Cmdable, ids StreamIDs, cfgs ...StreamConsumerConfig) *StreamConsumer[T]
NewConsumer creates a new StreamConsumer with optional configuration.
func (*StreamConsumer[T]) Chan ¶
func (sc *StreamConsumer[T]) Chan() <-chan Message[T]
Chan returns the main channel with new messages.
This channel is closed when: - the consumer is closed - immediately on context cancel - in case of a ReadError
func (*StreamConsumer[T]) Close ¶
func (sc *StreamConsumer[T]) Close() StreamIDs
Close returns a StreamIds that shows, up to which entry the streams were consumed.
The StreamIds can be used to construct a new StreamConsumer that will pick up where this left off.
type StreamConsumerConfig ¶
type StreamConsumerConfig struct { Block time.Duration // milliseconds to block before timing out. 0 means infinite Count int64 // maximum number of entries per request. 0 means not limited BufferSize uint // how many entries to prefetch at most }
StreamConsumerConfig provides basic configuration for StreamConsumer. It can be passed as the last argument to NewConsumer.