gtrs

package module
v0.6.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Dec 24, 2023 License: Apache-2.0 Imports: 8 Imported by: 0

README

Go Typed Redis Streams

Go Report Card

Effectively reading Redis streams requires some work: counting ids, prefetching and buffering, asynchronously sending acknowledgements and parsing entries. What if it was just the following?

consumer := NewGroupConsumer[MyType](...)
for msg := range consumer.Chan() {
  // Handle mssage
  consumer.Ack(msg)
}

Wait...it is! 🔥

Quickstart

Define a type that represents your stream data. It'll be parsed automatically with all field names converted to snake case. Missing fields will be skipped silently. You can also use the ConvertibleFrom and ConvertibleTo interfaces to do custom parsing. Struct tags can be used to rename fields.

// maps to {"name": , "priority": , "to":}
type Event struct {
  Name     string
  Priority int
  Target   string `gtrs:"to"`
}
Consumers

Consumers allow reading redis streams through Go channels. Specify context, a redis client and where to start reading. Make sure to specify StreamConsumerConfig, if you don't like the default ones or want optimal performance. New entries are fetched asynchronously to provide a fast flow 🚂

consumer := NewConsumer[Event](ctx, rdb, StreamIDs{"my-stream": "$"})

for msg := range cs.Chan() {
  if msg.Err != nil {
    continue
  }
  var event Event = msg.Data
}

Don't forget to Close() the consumer. If you want to start reading again where you left off, you can save the last StreamIDs.

ids := cs.Close()
Group Consumers

They work just like regular consumers and allow sending acknowledgements asynchronously. Beware to use Ack only if you keep processing new messages - that is inside a consuming loop or from another goroutine. Even though this introduces a two-sided depdendecy, the consumer is avoids deadlocks.

cs := NewGroupConsumer[Event](ctx, rdb, "group", "consumer", "stream", ">")

for msg := range cs.Chan() {
  cs.Ack(msg)
}

Stopped processing? Check your errors 🔎

// Wait for all acknowledgements to complete
errors := cs.AwaitAcks()

// Acknowledgements that were not sent yet or their errors were not consumed
remaining := cs.Close()
Error handling

This is where the simplicity fades a litte, but only a little :) The channel provides not just values, but also errors. Those can be only of three types:

  • ReadError reports a failed XRead/XReadGroup request. Consumer will close the channel after this error
  • AckError reports a failed XAck request
  • ParseError speaks for itself

Consumers don't send errors on cancellation and immediately close the channel.

switch errv := msg.Err.(type) {
case nil: // This interface-nil comparison in safe
  fmt.Println("Got", msg.Data)
case ReadError:
  fmt.Println("ReadError caused by", errv.Err)
  return // last message in channel
case AckError:
  fmt.Printf("Ack failed %v-%v caused by %v\n", msg.Stream, msg.ID, errv.Err)
case ParseError:
  fmt.Println("Failed to parse", errv.Data)
}

All those types are wrapping errors. For example, ParseError can be unwrapped to:

  • Find out why the default parser failed via FieldParseError (e.g. assigning string to int field)
  • Catch custom errors from ConvertibleFrom
var fpe FieldParseError
if errors.As(msg.Err, &fpe) {
  fmt.Printf("Failed to parse field %v because %v", fpe.Field, fpe.Err)
}

errors.Is(msg.Err, errMyTypeFailedToParse)
Streams

Streams are simple wrappers for basic redis commands on a stream.

stream := NewStream[Event](rdb, "my-stream", &Options{TTL: time.Hour, MaxLen: 1000, Approx: true})
stream.Add(ctx, Event{
  Kind:     "Example event",
  Priority: 1,
})

The Options.TTL parameter will evict stream entries after the specified duration has elapsed (or it can be set to NoExpiration). The Options.MaxLen parameter will remove older stream entries to accommodate newer entries after the maximum number of entries is reached. The Options.Approx parameter provides better efficiency by using almost exact trimming.

Metadata

The package defines a Metadata type as:

type Metadata map[string]any

This allows serialization (and deserialization) of generic structured metadata within the stream entries. Any value that can be serialized to JSON can be inserted from a field of this type (it uses JSON marshaller under the hood). For example:

stream.Add(ctx, EventWithMetadata{
  Kind:     "Example event",
  Priority: 1,
  Meta: Metadata{"string": "foobar", "float": float64(1234.5)},
})
Installation
go get github.com/dranikpg/gtrs

Gtrs is still in its early stages and might change in further releases.

Examples
Performance
go test -run ^$ -bench BenchmarkConsumer -cpu=1

The iteration cost on a mocked client is about 500-700 ns depending on buffer sizes, which gives it a throughput close to 2 million entries a second 🚀. Getting bad results? Make sure to set large buffer sizes in the options.

Documentation

Overview

GTRS (Go Typed Redis Streams) is a library for easily reading Redis streams with or without consumer groups. See https://github.com/dranikpg/gtrs for a quick tutorial.

Index

Constants

This section is empty.

Variables

View Source
var ErrAckBadRetVal = errors.New("XAck made no acknowledgement")

ErrAckBadRetVal is caused by XACK not accepting an request by returning 0. This usually indicates that the id is wrong or the stream has no groups.

View Source
var NoExpiration = time.Duration(0)
View Source
var NoMaxLen = int64(0)

Functions

This section is empty.

Types

type AckError

type AckError struct {
	Err error
}

AckError indicates that an acknowledgement request failed.

func (AckError) Error

func (ae AckError) Error() string

func (AckError) Unwrap

func (ae AckError) Unwrap() error

type AsciiTime added in v0.6.0

type AsciiTime = gtrsconvert.AsciiTime

AsciiTime is a type that wraps time.Time, however the (Un)[M/m]arshalbinary functions are overridden to marshal in the same format as Text

type Consumer

type Consumer[T any] interface {
	Chan() <-chan Message[T]
	Close()
}

Consumer is a generic consumer interface

type ConvertibleFrom

type ConvertibleFrom = gtrsconvert.ConvertibleFrom

ConvertibleFrom is implemented by types that can load themselves from a map.

type ConvertibleTo

type ConvertibleTo = gtrsconvert.ConvertibleTo

ConvertibleTo is implemented by types that can convert themselves to a map.

type FieldParseError

type FieldParseError = gtrsconvert.FieldParseError

FieldParseError is returned with a field fails to be parsed

type GroupConsumer

type GroupConsumer[T any] struct {
	Consumer[T]
	// contains filtered or unexported fields
}

GroupConsumer is a consumer that reads from a consumer group and similar to a StreamConsumer. Message acknowledgements can be sent asynchronously via Ack(). The consumer has to be closed to release resources and stop goroutines.

func NewGroupConsumer

func NewGroupConsumer[T any](ctx context.Context, rdb redis.Cmdable, group, name, stream, lastID string, cfgs ...GroupConsumerConfig) *GroupConsumer[T]

NewGroupConsumer creates a new GroupConsumer with optional configuration. Only the first configuration element is use.

func NewGroupMultiStreamConsumer added in v0.6.0

func NewGroupMultiStreamConsumer[T any](ctx context.Context, rdb redis.Cmdable, group, name string, seenIds StreamIDs, cfgs ...GroupConsumerConfig) *GroupConsumer[T]

NewGroupMultiStreamConsumer creates a new GroupConsumer with optional configuration. Only the first configuration element is use.

func (*GroupConsumer[T]) Ack

func (gc *GroupConsumer[T]) Ack(msg Message[T])

Ack requests an asynchronous XAck acknowledgement request for the passed message.

NOTE: Ack sometimes provides backpressure, so it should be only used inside the consumer loop or with another goroutine handling errors from the consumer channel. Otherwise it may deadlock.

func (*GroupConsumer[T]) AwaitAcks

func (gc *GroupConsumer[T]) AwaitAcks() []Message[T]

AwaitAcks blocks until all so far requested ack requests are processed and returns a slice of Messages with AckErrors that happened during wait.

func (*GroupConsumer[T]) Chan

func (gc *GroupConsumer[T]) Chan() <-chan Message[T]

Chan returns the main channel with new messages.

This channel is closed when: - the consumer is closed - immediately on context cancel - in case of a ReadError - in case Ack fail, AckError is return

func (*GroupConsumer[T]) Close

func (gc *GroupConsumer[T]) Close() []InnerAck

Close closes the consumer (if not already closed) and returns a slice of unprocessed ack requests. An ack request in unprocessed if it wasn't sent or its error wasn't consumed.

type GroupConsumerConfig

type GroupConsumerConfig struct {
	StreamConsumerConfig
	AckBufferSize uint
}

GroupConsumerConfig provides basic configuration for GroupConsumer.

type InnerAck added in v0.6.0

type InnerAck struct {
	ID     string
	Stream string
}

type Message

type Message[T any] struct {
	ID     string
	Stream string
	Err    error
	Data   T
}

A generic message with an ID, stream, data and an optional error.

type Metadata added in v0.3.0

type Metadata = gtrsconvert.Metadata

Metadata is a type that allows serialization of generic structured metadata within the stream entries. Any value that can be serialized to JSON can be inserted here.

type Options added in v0.3.0

type Options struct {
	// TTL is an optional parameter to specify how long entries stay in the stream before expiring,
	// it only only works as expected when a non-custom id is used to Add a message.
	// The default is No Expiration.
	// Note that TTL is performed when messages are Added, so Range requests won't clean up old messages.
	TTL time.Duration
	// MaxLen is an optional parameter to specify the maximum length of the stream.
	MaxLen int64
	// Approx causes MaxLen and TTL to be approximate instead of exact.
	Approx bool
}

type ParseError

type ParseError struct {
	Data map[string]interface{} // raw data returned by the redis client
	Err  error
}

ParseError indicates an error during parsing.

func (ParseError) Error

func (pe ParseError) Error() string

func (ParseError) Unwrap

func (pe ParseError) Unwrap() error

type ReadError

type ReadError struct {
	Err error
}

ReadError indicates an erorr with the redis client.

After a ReadError was returned from a consumer, it'll close its main channel.

func (ReadError) Error

func (ce ReadError) Error() string

func (ReadError) Unwrap

func (ce ReadError) Unwrap() error

type SerializeError added in v0.3.0

type SerializeError = gtrsconvert.SerializeError

SerializeError is returned with a field fails to be serialized

type Stream

type Stream[T any] struct {
	// contains filtered or unexported fields
}

Stream represents a redis stream with messages of type T.

func NewStream

func NewStream[T any](client redis.Cmdable, stream string, opt *Options) Stream[T]

NewStream create a new stream with messages of type T. Options are optional (the parameter can be nil to use defaults).

func (Stream[T]) Add

func (s Stream[T]) Add(ctx context.Context, v T, idarg ...string) (string, error)

Add a message to the stream. Calls XADD.

func (Stream[T]) Key

func (s Stream[T]) Key() string

Key returns the redis stream key.

func (Stream[T]) Len

func (s Stream[T]) Len(ctx context.Context) (int64, error)

Len returns the current stream length. Calls XLEN.

func (Stream[T]) Range

func (s Stream[T]) Range(ctx context.Context, from, to string, count ...int64) ([]Message[T], error)

Range returns a portion of the stream. Calls XRANGE.

func (Stream[T]) RevRange added in v0.3.0

func (s Stream[T]) RevRange(ctx context.Context, from, to string, count ...int64) ([]Message[T], error)

RevRange returns a portion of the stream in reverse order compared to Range. Calls XREVRANGE.

type StreamConsumer

type StreamConsumer[T any] struct {
	Consumer[T]
	// contains filtered or unexported fields
}

StreamConsumer is a consumer that reads from one or multiple redis streams. The consumer has to be closed to release resources and stop goroutines.

func NewConsumer

func NewConsumer[T any](ctx context.Context, rdb redis.Cmdable, ids StreamIDs, cfgs ...StreamConsumerConfig) *StreamConsumer[T]

NewConsumer creates a new StreamConsumer with optional configuration.

func (*StreamConsumer[T]) Chan

func (sc *StreamConsumer[T]) Chan() <-chan Message[T]

Chan returns the main channel with new messages.

This channel is closed when: - the consumer is closed - immediately on context cancel - in case of a ReadError

func (*StreamConsumer[T]) Close

func (sc *StreamConsumer[T]) Close() StreamIDs

Close returns a StreamIds that shows, up to which entry the streams were consumed.

The StreamIds can be used to construct a new StreamConsumer that will pick up where this left off.

type StreamConsumerConfig

type StreamConsumerConfig struct {
	Block      time.Duration // milliseconds to block before timing out. 0 means infinite
	Count      int64         // maximum number of entries per request. 0 means not limited
	BufferSize uint          // how many entries to prefetch at most
}

StreamConsumerConfig provides basic configuration for StreamConsumer. It can be passed as the last argument to NewConsumer.

type StreamIDs

type StreamIDs = map[string]string

Directories

Path Synopsis

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL