gtrs

package module
v0.2.2 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Mar 30, 2023 License: Apache-2.0 Imports: 9 Imported by: 0

README

Go Typed Redis Streams

Go Report Card

Effectively reading Redis streams requires some work: counting ids, prefetching and buffering, asynchronously sending acknowledgements and parsing entries. What if it was just the following?

consumer := NewGroupConsumer[MyType](...)
for msg := range consumer.Chan() {
  // Handle mssage
  consumer.Ack(msg)
}

Wait...it is! 🔥

Quickstart

Define a type that represents your stream data. It'll be parsed automatically with all field names converted to snake case. Missing fields will be skipped silently. You can also use the ConvertibleFrom and ConvertibleTo interfaces to do custom parsing.

// maps to {"name": , "priority": }
type Event struct {
  Name     string
  Priority int
}
Consumers

Consumers allow reading redis streams through Go channels. Specify context, a redis client and where to start reading. Make sure to specify StreamConsumerConfig, if you don't like the default ones or want optimal performance. New entries are fetched asynchronously to provide a fast flow 🚂

consumer := NewConsumer[Event](ctx, rdb, StreamIDs{"my-stream": "$"})

for msg := range cs.Chan() {
  if msg.Err != nil {
    continue
  }
  var event Event = msg.Data
}

Don't forget to Close() the consumer. If you want to start reading again where you left off, you can save the last StreamIDs.

ids := cs.Close()
Group Consumers

They work just like regular consumers and allow sending acknowledgements asynchronously. Beware to use Ack only if you keep processing new messages - that is inside a consuming loop or from another goroutine. Even though this introduces a two-sided depdendecy, the consumer is avoids deadlocks.

cs := NewGroupConsumer[Event](ctx, rdb, "group", "consumer", "stream", ">")

for msg := range cs.Chan() {
  cs.Ack(msg)
}

Stopped processing? Check your errors 🔎

// Wait for all acknowledgements to complete
errors := cs.AwaitAcks()

// Acknowledgements that were not sent yet or their errors were not consumed
remaining := cs.Close()
Error handling

This is where the simplicity fades a litte, but only a little :) The channel provides not just values, but also errors. Those can be only of three types:

  • ReadError reports a failed XRead/XReadGroup request. Consumer will close the channel after this error
  • AckError reports a failed XAck request
  • ParseError speaks for itself

Consumers don't send errors on cancellation and immediately close the channel.

switch errv := msg.Err.(type) {
case nil: // This interface-nil comparison in safe
  fmt.Println("Got", msg.Data)
case ReadError:
  fmt.Println("ReadError caused by", errv.Err)
  return // last message in channel
case AckError:
  fmt.Printf("Ack failed %v-%v caused by %v\n", msg.Stream, msg.ID, errv.Err)
case ParseError:
  fmt.Println("Failed to parse", errv.Data)
}

All those types are wrapping errors. For example, ParseError can be unwrapped to:

  • Find out why the default parser failed via FieldParseError (e.g. assigning string to int field)
  • Catch custom errors from ConvertibleFrom
var fpe FieldParseError
if errors.As(msg.Err, &fpe) {
  fmt.Printf("Failed to parse field %v because %v", fpe.Field, fpe.Err)
}

errors.Is(msg.Err, errMyTypeFailedToParse)
Streams

Streams are simple wrappers for basic redis commands on a stream.

ttl := time.Hour
stream := NewStream[Event](rdb, "my-stream", ttl)
stream.Add(ctx, Event{
  Kind:     "Example event",
  Priority: 1,
})

The TTL parameter will evict stream entries after the specified duration has elapsed (or it can be set to NoExpiration).

Installation
go get github.com/dranikpg/gtrs

Gtrs is still in its early stages and might change in further releases.

Examples
Performance
go test -run ^$ -bench BenchmarkConsumer -cpu=1

The iteration cost on a mocked client is about 500-700 ns depending on buffer sizes, which gives it a throughput close to 2 million entries a second 🚀. Getting bad results? Make sure to set large buffer sizes in the options.

Documentation

Overview

GTRS (Go Typed Redis Streams) is a library for easily reading Redis streams with or without consumer groups. See https://github.com/dranikpg/gtrs for a quick tutorial.

Index

Constants

This section is empty.

Variables

View Source
var ErrAckBadRetVal = errors.New("XAck made no acknowledgement")

ErrAckBadRetVal is caused by XACK not accepting an request by returning 0. This usually indicates that the id is wrong or the stream has no groups.

View Source
var NoExpiration = time.Duration(0)

Functions

This section is empty.

Types

type AckError

type AckError struct {
	Err error
}

AckError indicates that an acknowledgement request failed.

func (AckError) Error

func (ae AckError) Error() string

func (AckError) Unwrap

func (ae AckError) Unwrap() error

type Consumer

type Consumer[T any] interface {
	Chan() <-chan Message[T]
	Close()
}

Consumer is a generic consumer interface

type ConvertibleFrom

type ConvertibleFrom interface {
	FromMap(map[string]any) error
}

ConvertibleFrom is implemented by types that can load themselves from a map.

type ConvertibleTo

type ConvertibleTo interface {
	ToMap() map[string]any
}

ConvertibleTo is implemented by types that can convert themselves to a map.

type FieldParseError

type FieldParseError struct {
	Field string
	Value any
	Err   error
}

FieldParseError is returned by the default parser if data for a field is present: - but its not assignable - or the field is of an unsupported type

func (FieldParseError) Error

func (fpe FieldParseError) Error() string

func (FieldParseError) Unwrap

func (fpe FieldParseError) Unwrap() error

type GroupConsumer

type GroupConsumer[T any] struct {
	Consumer[T]
	// contains filtered or unexported fields
}

GroupConsumer is a consumer that reads from a consumer group and similar to a StreamConsumer. Message acknowledgements can be sent asynchronously via Ack(). The consumer has to be closed to release resources and stop goroutines.

func NewGroupConsumer

func NewGroupConsumer[T any](ctx context.Context, rdb redis.Cmdable, group, name, stream, lastID string, cfgs ...GroupConsumerConfig) *GroupConsumer[T]

NewGroupConsumer creates a new GroupConsumer with optional configuration.

func (*GroupConsumer[T]) Ack

func (gc *GroupConsumer[T]) Ack(msg Message[T])

Ack requests an asynchronous XAck acknowledgement request for the passed message.

NOTE: Ack sometimes provides backpressure, so it should be only used inside the consumer loop or with another goroutine handling errors from the consumer channel. Otherwise it may deadlock.

func (*GroupConsumer[T]) AwaitAcks

func (gc *GroupConsumer[T]) AwaitAcks() []Message[T]

AwaitAcks blocks until all so far requested ack requests are processed and returns a slice of Messages with AckErrors that happened during wait.

func (*GroupConsumer[T]) Chan

func (gc *GroupConsumer[T]) Chan() <-chan Message[T]

Chan returns the main channel with new messages.

This channel is closed when: - the consumer is closed - immediately on context cancel - in case of a ReadError

func (*GroupConsumer[T]) Close

func (gc *GroupConsumer[T]) Close() []string

CloseGetRemainingAcks closes the consumer (if not already closed) and returns a slice of unprocessed ack requests. An ack request in unprocessed if it wasn't sent or its error wasn't consumed.

type GroupConsumerConfig

type GroupConsumerConfig struct {
	StreamConsumerConfig
	AckBufferSize uint
}

GroupConsumerConfig provides basic configuration for GroupConsumer.

type Message

type Message[T any] struct {
	ID     string
	Stream string
	Err    error
	Data   T
}

A generic message with an ID, stream, data and an optional error.

type ParseError

type ParseError struct {
	Data map[string]interface{} // raw data returned by the redis client
	Err  error
}

ParseError indicates an error during parsing.

func (ParseError) Error

func (pe ParseError) Error() string

func (ParseError) Unwrap

func (pe ParseError) Unwrap() error

type ReadError

type ReadError struct {
	Err error
}

ReadError indicates an erorr with the redis client.

After a ReadError was returned from a consumer, it'll close its main channel.

func (ReadError) Error

func (ce ReadError) Error() string

func (ReadError) Unwrap

func (ce ReadError) Unwrap() error

type Stream

type Stream[T any] struct {
	// contains filtered or unexported fields
}

Stream represents a redis stream with messages of type T.

func NewStream

func NewStream[T any](client redis.Cmdable, stream string, ttl time.Duration) Stream[T]

Create a new stream with messages of type T. TTL is an optional parameter to setup expiration for stream messages, it only only works as expected when a non-custom id is used to Add a message. TTL can be zero to disable expiration. Note that TTL is performed when messages are Added, so Range requests won't clean up old messages.

func (Stream[T]) Add

func (s Stream[T]) Add(ctx context.Context, v T, idarg ...string) (string, error)

Add a message to the stream. Calls XADD.

func (Stream[T]) Key

func (s Stream[T]) Key() string

Key returns the redis stream key.

func (Stream[T]) Len

func (s Stream[T]) Len(ctx context.Context) (int64, error)

Len returns the current stream length. Calls XLEN.

func (Stream[T]) Range

func (s Stream[T]) Range(ctx context.Context, from, to string, count ...int64) ([]Message[T], error)

Range returns a portion of the stream. Calls XRANGE.

func (Stream[T]) RevRange added in v0.2.1

func (s Stream[T]) RevRange(ctx context.Context, from, to string, count ...int64) ([]Message[T], error)

RevRange returns a portion of the stream in reverse order compared to Range. Calls XREVRANGE.

type StreamConsumer

type StreamConsumer[T any] struct {
	Consumer[T]
	// contains filtered or unexported fields
}

StreamConsumer is a consumer that reads from one or multiple redis streams. The consumer has to be closed to release resources and stop goroutines.

func NewConsumer

func NewConsumer[T any](ctx context.Context, rdb redis.Cmdable, ids StreamIDs, cfgs ...StreamConsumerConfig) *StreamConsumer[T]

NewConsumer creates a new StreamConsumer with optional configuration.

func (*StreamConsumer[T]) Chan

func (sc *StreamConsumer[T]) Chan() <-chan Message[T]

Chan returns the main channel with new messages.

This channel is closed when: - the consumer is closed - immediately on context cancel - in case of a ReadError

func (*StreamConsumer[T]) Close

func (sc *StreamConsumer[T]) Close() StreamIDs

Close returns a StreamIds that shows, up to which entry the streams were consumed.

The StreamIds can be used to construct a new StreamConsumer that will pick up where this left off.

type StreamConsumerConfig

type StreamConsumerConfig struct {
	Block      time.Duration // milliseconds to block before timing out. 0 means infinite
	Count      int64         // maximum number of entries per request. 0 means not limited
	BufferSize uint          // how many entries to prefetch at most
}

StreamConsumerConfig provides basic configuration for StreamConsumer. It can be passed as the last argument to NewConsumer.

type StreamIDs

type StreamIDs = map[string]string

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL