Documentation ¶
Overview ¶
GTRS (Go Typed Redis Streams) is a library for easily reading Redis streams with or without consumer groups. See https://github.com/dranikpg/gtrs for a quick tutorial.
Index ¶
- Variables
- type AckError
- type Consumer
- type ConvertibleFrom
- type ConvertibleTo
- type FieldParseError
- type GroupConsumer
- type GroupConsumerConfig
- type Message
- type ParseError
- type ReadError
- type Stream
- func (s Stream[T]) Add(ctx context.Context, v T, idarg ...string) (string, error)
- func (s Stream[T]) Key() string
- func (s Stream[T]) Len(ctx context.Context) (int64, error)
- func (s Stream[T]) Range(ctx context.Context, from, to string, count ...int64) ([]Message[T], error)
- func (s Stream[T]) RevRange(ctx context.Context, from, to string, count ...int64) ([]Message[T], error)
- type StreamConsumer
- type StreamConsumerConfig
- type StreamIDs
Constants ¶
This section is empty.
Variables ¶
var ErrAckBadRetVal = errors.New("XAck made no acknowledgement")
ErrAckBadRetVal is caused by XACK not accepting an request by returning 0. This usually indicates that the id is wrong or the stream has no groups.
var NoExpiration = time.Duration(0)
Functions ¶
This section is empty.
Types ¶
type AckError ¶
type AckError struct {
Err error
}
AckError indicates that an acknowledgement request failed.
type ConvertibleFrom ¶
ConvertibleFrom is implemented by types that can load themselves from a map.
type ConvertibleTo ¶
ConvertibleTo is implemented by types that can convert themselves to a map.
type FieldParseError ¶
FieldParseError is returned by the default parser if data for a field is present: - but its not assignable - or the field is of an unsupported type
func (FieldParseError) Error ¶
func (fpe FieldParseError) Error() string
func (FieldParseError) Unwrap ¶
func (fpe FieldParseError) Unwrap() error
type GroupConsumer ¶
GroupConsumer is a consumer that reads from a consumer group and similar to a StreamConsumer. Message acknowledgements can be sent asynchronously via Ack(). The consumer has to be closed to release resources and stop goroutines.
func NewGroupConsumer ¶
func NewGroupConsumer[T any](ctx context.Context, rdb redis.Cmdable, group, name, stream, lastID string, cfgs ...GroupConsumerConfig) *GroupConsumer[T]
NewGroupConsumer creates a new GroupConsumer with optional configuration.
func (*GroupConsumer[T]) Ack ¶
func (gc *GroupConsumer[T]) Ack(msg Message[T])
Ack requests an asynchronous XAck acknowledgement request for the passed message.
NOTE: Ack sometimes provides backpressure, so it should be only used inside the consumer loop or with another goroutine handling errors from the consumer channel. Otherwise it may deadlock.
func (*GroupConsumer[T]) AwaitAcks ¶
func (gc *GroupConsumer[T]) AwaitAcks() []Message[T]
AwaitAcks blocks until all so far requested ack requests are processed and returns a slice of Messages with AckErrors that happened during wait.
func (*GroupConsumer[T]) Chan ¶
func (gc *GroupConsumer[T]) Chan() <-chan Message[T]
Chan returns the main channel with new messages.
This channel is closed when: - the consumer is closed - immediately on context cancel - in case of a ReadError
func (*GroupConsumer[T]) Close ¶
func (gc *GroupConsumer[T]) Close() []string
CloseGetRemainingAcks closes the consumer (if not already closed) and returns a slice of unprocessed ack requests. An ack request in unprocessed if it wasn't sent or its error wasn't consumed.
type GroupConsumerConfig ¶
type GroupConsumerConfig struct { StreamConsumerConfig AckBufferSize uint }
GroupConsumerConfig provides basic configuration for GroupConsumer.
type ParseError ¶
type ParseError struct { Data map[string]interface{} // raw data returned by the redis client Err error }
ParseError indicates an error during parsing.
func (ParseError) Error ¶
func (pe ParseError) Error() string
func (ParseError) Unwrap ¶
func (pe ParseError) Unwrap() error
type ReadError ¶
type ReadError struct {
Err error
}
ReadError indicates an erorr with the redis client.
After a ReadError was returned from a consumer, it'll close its main channel.
type Stream ¶
type Stream[T any] struct { // contains filtered or unexported fields }
Stream represents a redis stream with messages of type T.
func NewStream ¶
Create a new stream with messages of type T. TTL is an optional parameter to setup expiration for stream messages, it only only works as expected when a non-custom id is used to Add a message. TTL can be zero to disable expiration. Note that TTL is performed when messages are Added, so Range requests won't clean up old messages.
type StreamConsumer ¶
StreamConsumer is a consumer that reads from one or multiple redis streams. The consumer has to be closed to release resources and stop goroutines.
func NewConsumer ¶
func NewConsumer[T any](ctx context.Context, rdb redis.Cmdable, ids StreamIDs, cfgs ...StreamConsumerConfig) *StreamConsumer[T]
NewConsumer creates a new StreamConsumer with optional configuration.
func (*StreamConsumer[T]) Chan ¶
func (sc *StreamConsumer[T]) Chan() <-chan Message[T]
Chan returns the main channel with new messages.
This channel is closed when: - the consumer is closed - immediately on context cancel - in case of a ReadError
func (*StreamConsumer[T]) Close ¶
func (sc *StreamConsumer[T]) Close() StreamIDs
Close returns a StreamIds that shows, up to which entry the streams were consumed.
The StreamIds can be used to construct a new StreamConsumer that will pick up where this left off.
type StreamConsumerConfig ¶
type StreamConsumerConfig struct { Block time.Duration // milliseconds to block before timing out. 0 means infinite Count int64 // maximum number of entries per request. 0 means not limited BufferSize uint // how many entries to prefetch at most }
StreamConsumerConfig provides basic configuration for StreamConsumer. It can be passed as the last argument to NewConsumer.