Documentation
¶
Overview ¶
Package goxstreams lets you to post and processes messages asynchronously using Redis Streams
Index ¶
Examples ¶
Constants ¶
This section is empty.
Variables ¶
This section is empty.
Functions ¶
This section is empty.
Types ¶
type Consumer ¶
type Consumer[E any] struct { // contains filtered or unexported fields }
Consumer is a wrapper to easily getting messages from redis stream.
func NewConsumer ¶
func NewConsumer[E any]( client RedisClient, converter ConsumerConverter[E], worker Worker[E], config ConsumerConfig, ) Consumer[E]
NewConsumer is a constructor Consumer struct.
func (Consumer[E]) Run ¶
Run is a method to start processing messages from redis stream.
Example ¶
package main
import (
"context"
"errors"
"fmt"
"strconv"
"time"
"github.com/khv1one/goxstreams"
"github.com/redis/go-redis/v9"
)
type ConsumerEvent struct {
RedisID string
Foo string
Bar int
}
type ConsumerConverter[E any] struct{}
func (c ConsumerConverter[E]) To(id string, event map[string]interface{}) (ConsumerEvent, error) {
result := ConsumerEvent{}
foo, ok := event["foo"].(string)
if !ok {
return result, errors.New("error convert to EventStruct, foo is not exist")
}
barStr, ok := event["bar"].(string)
if !ok {
return result, errors.New("error convert to EventStruct, bar is not exist")
}
bar, err := strconv.Atoi(barStr)
if err != nil {
return result, err
}
result.RedisID = id
result.Foo = foo
result.Bar = bar
return result, nil
}
type Worker[E any] struct {
Name string
}
func (w Worker[E]) Process(event ConsumerEvent) error {
fmt.Printf("read event from %v: %v, worker: %v\n", "mystream", event, w.Name)
return nil
}
func (w Worker[E]) ProcessBroken(broken map[string]interface{}) error {
fmt.Printf("read broken event from %v: %v, worker: %v\n", "mystream", broken, w.Name)
return nil
}
func (w Worker[E]) ProcessDead(dead ConsumerEvent) error {
fmt.Printf("event %v from stream %v is dead!, worker: %v\n", dead.RedisID, "mystream", w.Name)
return nil
}
func (c ConsumerConverter[E]) GetRedisID(event ConsumerEvent) string {
return event.RedisID
}
func main() {
consumerInit := func() goxstreams.Consumer[ConsumerEvent] {
redisClient := redis.NewClient(&redis.Options{Addr: "localhost:6379"})
config := goxstreams.ConsumerConfig{
Stream: "mystream",
Group: "mygroup",
ConsumerName: "consumer",
BatchSize: 100,
MaxConcurrency: 5000,
NoAck: false,
MaxRetries: 3,
CleaneUp: false,
FailReadTime: 1000 * time.Millisecond,
FailIdle: 5000 * time.Millisecond,
}
myConsumer := goxstreams.NewConsumer[ConsumerEvent](
redisClient,
ConsumerConverter[ConsumerEvent]{},
Worker[ConsumerEvent]{"foo"},
config,
)
return myConsumer
}
consumerInit().Run(context.Background())
}
type ConsumerConfig ¶
type ConsumerConverter ¶
type ConsumerConverter[E any] interface { To(id string, event map[string]interface{}) (E, error) GetRedisID(E) string }
ConsumerConverter is an interface for convert hash to business model.
type Producer ¶
type Producer[E any] struct { // contains filtered or unexported fields }
Producer is a wrapper to easily produce messages to redis stream.
func NewProducer ¶
func NewProducer[E any](client RedisClient, converter ProducerConverter[E]) Producer[E]
NewProducer is a constructor Producer struct.
func (Producer[E]) Produce ¶
Produce method for push message to redis stream.
Example ¶
package main
import (
"context"
"github.com/khv1one/goxstreams"
"github.com/redis/go-redis/v9"
)
type ProducerEvent struct {
Foo string
Bar int
}
type ProduceConverter[E any] struct{}
func (c ProduceConverter[E]) From(event ProducerEvent) map[string]interface{} {
result := make(map[string]interface{})
result["foo"] = event.Foo
result["bar"] = event.Bar
return result
}
func main() {
converter := ProduceConverter[ProducerEvent]{}
producer := goxstreams.NewProducer[ProducerEvent](redis.NewClient(&redis.Options{Addr: "localhost:6379"}), converter)
event := ProducerEvent{"foo", 1}
_ = producer.Produce(context.Background(), event, "mystream")
}
type ProducerConverter ¶
ProducerConverter is an interface for converting business model to hash.
type RedisClient ¶
type RedisClient interface {
redis.Cmdable
}
RedisClient required to use cluster client
Click to show internal directories.
Click to hide internal directories.
