Documentation
¶
Overview ¶
Package goxstreams lets you to post and processes messages asynchronously using Redis Streams
Index ¶
Examples ¶
Constants ¶
This section is empty.
Variables ¶
This section is empty.
Functions ¶
This section is empty.
Types ¶
type Consumer ¶
type Consumer[E any] struct { // contains filtered or unexported fields }
Consumer is a wrapper to easily getting messages from redis stream.
func NewConsumer ¶
func NewConsumer[E any]( client RedisClient, worker Worker[E], config ConsumerConfig, ) Consumer[E]
NewConsumer is a constructor Consumer struct.
func NewConsumerWithConverter ¶ added in v0.1.1
func NewConsumerWithConverter[E any]( client RedisClient, worker Worker[E], convertTo func(event map[string]interface{}) (*E, error), config ConsumerConfig, ) Consumer[E]
NewConsumerWithConverter is a constructor Consumer struct with custom convert.
Since Redis Streams messages are limited to a flat structure, we have 2 options available:
- flat Example: ("foo_key", "foo_val", "bar_key", "bar_val");
- nested json or proto into one key ("key", "{"foobar": {"foo_key": "foo_val", "bar_key": "bar_val"}}")
- or combination ("foo_key", "foo_val", "foobar", "{"foobar": {"foo_key": "foo_val", "bar_key": "bar_val"}}")
func (Consumer[E]) Run ¶
Run is a method to start processing messages from redis stream.
This method will start two processes: xreadgroup and xpending + xclaim. To stop - just cancel the context
Example ¶
package main import ( "context" "fmt" "time" "github.com/khv1one/goxstreams" "github.com/redis/go-redis/v9" ) type ConsumerEvent struct { RedisID string Foo string Bar int } type Worker[E any] struct { Name string } func (w Worker[E]) Process(event goxstreams.RedisMessage[ConsumerEvent]) error { fmt.Printf("read event from %s: id: %s, retry: %d, body: %v, worker: %v\n", "mystream", event.ID, event.RetryCount, event.Body, w.Name) return nil } func (w Worker[E]) ProcessBroken(broken goxstreams.RedisBrokenMessage) error { fmt.Printf("read broken event from %s: id: %s, retry: %d, body: %v, worker: %v, err: %s\n", "mystream", broken.ID, broken.RetryCount, broken.Body, w.Name, broken.Error.Error()) return nil } func (w Worker[E]) ProcessDead(dead goxstreams.RedisMessage[ConsumerEvent]) error { fmt.Printf("read from %s is dead!!! id: %s, retry: %d, body: %v, worker: %v\n", "mystream", dead.ID, dead.RetryCount, dead.Body, w.Name) return nil } func main() { consumerInit := func() goxstreams.Consumer[ConsumerEvent] { redisClient := redis.NewClient(&redis.Options{Addr: "localhost:6379"}) config := goxstreams.ConsumerConfig{ Stream: "mystream", Group: "mygroup", ConsumerName: "consumer", BatchSize: 100, MaxConcurrency: 5000, NoAck: false, MaxRetries: 3, CleaneUp: false, FailReadTime: 1000 * time.Millisecond, FailIdle: 5000 * time.Millisecond, } myConsumer := goxstreams.NewConsumer[ConsumerEvent](redisClient, Worker[ConsumerEvent]{"foo"}, config) return myConsumer } consumerInit().Run(context.Background()) }
Output:
type ConsumerConfig ¶
type ConsumerConfig struct { Stream string Group string ConsumerName string BatchSize int64 // Default: 1 MaxConcurrency int64 // Default: 1 NoAck bool // Default: false MaxRetries int64 // Default: 0 CleaneUp bool // Default: false FailIdle time.Duration // Default: 1 second FailReadTime time.Duration // Default: 2 seconds }
ConsumerConfig is configuration set for consumer work
Stream: name of the stream where we read it from Group: each group processes messages independently of the other ConsumerName: client name in the group, may not be unique BatchSize: (optional) the size of messages read from the stream per request MaxConcurrency: (optional) maximum number of message processing goroutines NoAck: (optional) when true - messages will not be reprocessed if there was an error MaxRetries: (optional) the number of times the message will be reprocessed on errors CleaneUp: (optional) automatic deletion of messages after successful processing FailReadTime: (optional) Failed messages read interval FailIdle: (optional) The time after which the message will be considered corrupted Example: ConsumerConfig{ Stream: "mystream", Group: "mygroup", ConsumerName: "consumer", BatchSize: 100, MaxConcurrency: 50, MaxRetries: 3, }
type Producer ¶
type Producer[E any] struct { // contains filtered or unexported fields }
Producer is a wrapper to easily produce messages to redis stream.
func NewProducer ¶
func NewProducer[E any](client RedisClient) Producer[E]
NewProducer is a constructor Producer struct.
func NewProducerWithConverter ¶ added in v0.1.1
func NewProducerWithConverter[E any](client RedisClient, convertFrom func(event *E) (map[string]interface{}, error)) Producer[E]
NewProducerWithConverter is a constructor Producer struct with custom converter.
Since Redis Streams messages are limited to a flat structure, we have 2 options available:
- flat Example: ("foo_key", "foo_val", "bar_key", "bar_val");
- nested json or proto into one key ("key", "{"foobar": {"foo_key": "foo_val", "bar_key": "bar_val"}}")
- or combination ("foo_key", "foo_val", "foobar", "{"foobar": {"foo_key": "foo_val", "bar_key": "bar_val"}}")
func (Producer[E]) Produce ¶
Produce method for push message to redis stream.
With default converter, redis message will be like:
- "xadd" "mystream" "*" "body" "{\"Message\":\"message\",\"Name\":\"name\",\"Foo\":712,\"Bar\":947}"
Example ¶
type ProducerEvent struct { Foo string Bar int } producer := goxstreams.NewProducer[ProducerEvent](redis.NewClient(&redis.Options{Addr: "localhost:6379"})) _ = producer.Produce(context.Background(), ProducerEvent{"foo", 1}, "mystream")
Output:
type RedisBrokenMessage ¶ added in v0.1.0
RedisBrokenMessage transmite to Worker.ProcessBroken method. Contains eventbody and addititional info.
type RedisClient ¶
type RedisClient interface { redis.Cmdable }
RedisClient required to use cluster client
type RedisMessage ¶ added in v0.1.0
RedisMessage transmite to Worker.Process and Worker.ProcessDead method. Contains eventbody and addititional info.
type Worker ¶
type Worker[E any] interface { Process(event RedisMessage[E]) error ProcessBroken(event RedisBrokenMessage) error ProcessDead(event RedisMessage[E]) error }
Worker is an interface for processing messages from redis stream.