goxstreams

package module
v0.1.3 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Sep 8, 2023 License: GPL-3.0 Imports: 9 Imported by: 0

README

goxstreams GoDoc

Based on the go-redis library Go-Redis

goxstreams lets you to post and processes messages asynchronously using Redis Streams

  • Reliable - don't lose messages even if your process crashes
  • If message processing fails, it will be repeated the specified number of times after the specified time.
  • Horizontally scalable - specify the number of goroutines in parallel running applications
  • Don't describe low-level interaction - focus on business logic

An example code can be found here

click

Describe the business model

  • Describe the model that we want to put in the stream
package app

type Event struct {
	Message  string
	Name     string
	Foo      int
	Bar      int
	SubEvent SubEvent
}

type SubEvent struct {
	BarBar string
	FooFoo SubSubEvent
}

type SubSubEvent struct {
	FooFooFoo int
}

Producing messages

Initialize your application:
  • create go-redis client
  • create producer
package main

import (
	"context"
	"fmt"
	"math/rand"
	"time"

	"example/app"

	"github.com/khv1one/goxstreams"
	"github.com/redis/go-redis/v9"
)

func main() {
	ctx := context.Background()

	producer := goxstreams.NewProducer[app.Event](redis.NewClient(&redis.Options{Addr: "localhost:6379"}))
	go write(producer, ctx)

	fmt.Println("Producer started")
	<-ctx.Done()
}

func write(producer goxstreams.Producer[app.Event], ctx context.Context) {
	for {
		event := app.Event{
			Message: "message", Name: "name", Foo: rand.Intn(1000), Bar: rand.Intn(1000),
			SubEvent: app.SubEvent{
				BarBar: "1234",
				FooFoo: app.SubSubEvent{FooFooFoo: 777},
			},
		}

		err := producer.Produce(ctx, event, "mystream")
		if err != nil {
			fmt.Printf("write error %v\n", err)
			time.Sleep(time.Second)
			continue
		}

		fmt.Printf("produced %v\n", event)

		time.Sleep(100 * time.Millisecond)
	}
}

You can use one producer to publish to different streams

Processing messages

Describe worker
package app

import (
	"errors"
	"fmt"
	"math/rand"
	"time"

	"github.com/khv1one/goxstreams"
)

type Worker[E interface{ Event }] struct {
	Name string
}

func NewWorker[E interface{ Event }](name string) Worker[E] {
	return Worker[E]{Name: name}
}

func (w Worker[E]) Process(event goxstreams.RedisMessage[E]) error {
	time.Sleep(1000 * time.Millisecond)

	a := rand.Intn(20)
	if a == 0 {
		return errors.New("rand error")
	} else {
		fmt.Printf("read event from %s: id: %s, retry: %d, body: %v, worker: %v\n",
			"mystream", event.ID, event.RetryCount, event.Body, w.Name)
	}

	return nil
}

func (w Worker[E]) ProcessBroken(broken goxstreams.RedisBrokenMessage) error {
	fmt.Printf("read broken event from %s: id: %s, retry: %d, body: %v, worker: %v, err: %s\n",
		"mystream", broken.ID, broken.RetryCount, broken.Body, w.Name, broken.Error.Error())

	return nil
}

func (w Worker[E]) ProcessDead(dead goxstreams.RedisMessage[E]) error {
	fmt.Printf("read from %s is dead!!! id: %s, retry: %d, body: %v, worker: %v\n",
		"mystream", dead.ID, dead.RetryCount, dead.Body, w.Name)

	return nil
}

You need to implement 3 methods:

  • normal message processing (including reprocessing)
  • processing of messages that could not be converted to the model (for example, put them in the database for further investigation)
  • processing messages, the number of retries of which exceeded the number specified in the config
Initialize your application:
  • create go-redis client
  • create worker object
  • create consumer config
  • create consumer
  • run consumer
package main

import (
	"context"
	"fmt"
	"time"

	"github.com/khv1one/goxstreams/internal/app"

	"github.com/khv1one/goxstreams"
	"github.com/redis/go-redis/v9"
)

func main() {
	ctx := context.Background()
	consumerCtx, _ := context.WithCancel(ctx)

	consumerInit().Run(consumerCtx)
	fmt.Println("Consumer Started")

	<-ctx.Done()
}

func consumerInit() goxstreams.Consumer[app.Event] {
	redisClient := redis.NewClient(&redis.Options{Addr: "localhost:6379"})

	config := goxstreams.ConsumerConfig{
		Stream:         "mystream",
		Group:          "mygroup",
		ConsumerName:   "consumer",
		BatchSize:      100,
		MaxConcurrency: 5000,
		NoAck:          false,
		MaxRetries:     3,
		CleaneUp:       false,
		FailReadTime:   1000 * time.Millisecond,
		FailIdle:       5000 * time.Millisecond,
	}

	myConsumer := goxstreams.NewConsumer[app.Event](redisClient, app.NewWorker[app.Event]("foo"), config)

	return myConsumer
}
Config description
  • Stream -- the name of the stream from which we read messages
  • Group -- each group processes messages independently of the other
  • ConsumerName -- client name in the group, may not be unique
  • BatchSize -- the size of messages read from the stream per request
  • MaxConcurrency -- maximum number of message processing goroutines
  • NoAck -- when true - messages will not be reprocessed if there was an error
  • MaxRetries -- the number of times the message will be reprocessed on errors
  • CleaneUp -- automatic deletion of messages after successful processing
  • FailReadTime -- Failed messages read interval
  • FailIdle -- The time after which the message will be considered corrupted

Benchmarks

WIP

Documentation

Overview

Package goxstreams lets you to post and processes messages asynchronously using Redis Streams

Index

Examples

Constants

This section is empty.

Variables

This section is empty.

Functions

This section is empty.

Types

type Consumer

type Consumer[E any] struct {
	// contains filtered or unexported fields
}

Consumer is a wrapper to easily getting messages from redis stream.

func NewConsumer

func NewConsumer[E any](
	client RedisClient, worker Worker[E], config ConsumerConfig,
) Consumer[E]

NewConsumer is a constructor Consumer struct.

func NewConsumerWithConverter added in v0.1.1

func NewConsumerWithConverter[E any](
	client RedisClient, worker Worker[E], convertTo func(event map[string]interface{}) (*E, error), config ConsumerConfig,
) Consumer[E]

NewConsumerWithConverter is a constructor Consumer struct with custom convert.

Since Redis Streams messages are limited to a flat structure, we have 2 options available:

  • flat Example: ("foo_key", "foo_val", "bar_key", "bar_val");
  • nested json or proto into one key ("key", "{"foobar": {"foo_key": "foo_val", "bar_key": "bar_val"}}")
  • or combination ("foo_key", "foo_val", "foobar", "{"foobar": {"foo_key": "foo_val", "bar_key": "bar_val"}}")

func (Consumer[E]) Run

func (c Consumer[E]) Run(ctx context.Context)

Run is a method to start processing messages from redis stream.

This method will start two processes: xreadgroup and xpending + xclaim. To stop - just cancel the context

Example
package main

import (
	"context"
	"fmt"
	"time"

	"github.com/khv1one/goxstreams"
	"github.com/redis/go-redis/v9"
)

type ConsumerEvent struct {
	RedisID string
	Foo     string
	Bar     int
}

type Worker[E any] struct {
	Name string
}

func (w Worker[E]) Process(event goxstreams.RedisMessage[ConsumerEvent]) error {
	fmt.Printf("read event from %s: id: %s, retry: %d, body: %v, worker: %v\n",
		"mystream", event.ID, event.RetryCount, event.Body, w.Name)

	return nil
}

func (w Worker[E]) ProcessBroken(broken goxstreams.RedisBrokenMessage) error {
	fmt.Printf("read broken event from %s: id: %s, retry: %d, body: %v, worker: %v, err: %s\n",
		"mystream", broken.ID, broken.RetryCount, broken.Body, w.Name, broken.Error.Error())

	return nil
}

func (w Worker[E]) ProcessDead(dead goxstreams.RedisMessage[ConsumerEvent]) error {
	fmt.Printf("read from %s is dead!!! id: %s, retry: %d, body: %v, worker: %v\n",
		"mystream", dead.ID, dead.RetryCount, dead.Body, w.Name)

	return nil
}

func main() {
	consumerInit := func() goxstreams.Consumer[ConsumerEvent] {
		redisClient := redis.NewClient(&redis.Options{Addr: "localhost:6379"})

		config := goxstreams.ConsumerConfig{
			Stream:         "mystream",
			Group:          "mygroup",
			ConsumerName:   "consumer",
			BatchSize:      100,
			MaxConcurrency: 5000,
			NoAck:          false,
			MaxRetries:     3,
			CleaneUp:       false,
			FailReadTime:   1000 * time.Millisecond,
			FailIdle:       5000 * time.Millisecond,
		}

		myConsumer := goxstreams.NewConsumer[ConsumerEvent](redisClient, Worker[ConsumerEvent]{"foo"}, config)

		return myConsumer
	}

	consumerInit().Run(context.Background())
}
Output:

type ConsumerConfig

type ConsumerConfig struct {
	Stream         string
	Group          string
	ConsumerName   string
	BatchSize      int64 // Default: 1
	MaxConcurrency int64 // Default: 1
	NoAck          bool  // Default: false
	MaxRetries     int64 // Default: 0
	CleaneUp       bool  // Default: false

	FailIdle     time.Duration // Default: 1 second
	FailReadTime time.Duration // Default: 2 seconds
}

ConsumerConfig is configuration set for consumer work

Stream: name of the stream where we read it from

Group: each group processes messages independently of the other

ConsumerName: client name in the group, may not be unique

BatchSize: (optional) the size of messages read from the stream per request

MaxConcurrency: (optional) maximum number of message processing goroutines

NoAck: (optional) when true - messages will not be reprocessed if there was an error

MaxRetries: (optional) the number of times the message will be reprocessed on errors

CleaneUp: (optional) automatic deletion of messages after successful processing

FailReadTime: (optional) Failed messages read interval

FailIdle: (optional) The time after which the message will be considered corrupted

Example:
ConsumerConfig{
	Stream:         "mystream",
	Group:          "mygroup",
	ConsumerName:   "consumer",
	BatchSize:      100,
	MaxConcurrency: 50,
	MaxRetries:     3,
}

type Producer

type Producer[E any] struct {
	// contains filtered or unexported fields
}

Producer is a wrapper to easily produce messages to redis stream.

func NewProducer

func NewProducer[E any](client RedisClient) Producer[E]

NewProducer is a constructor Producer struct.

func NewProducerWithConverter added in v0.1.1

func NewProducerWithConverter[E any](client RedisClient, convertFrom func(event *E) (map[string]interface{}, error)) Producer[E]

NewProducerWithConverter is a constructor Producer struct with custom converter.

Since Redis Streams messages are limited to a flat structure, we have 2 options available:

  • flat Example: ("foo_key", "foo_val", "bar_key", "bar_val");
  • nested json or proto into one key ("key", "{"foobar": {"foo_key": "foo_val", "bar_key": "bar_val"}}")
  • or combination ("foo_key", "foo_val", "foobar", "{"foobar": {"foo_key": "foo_val", "bar_key": "bar_val"}}")

func (Producer[E]) Produce

func (p Producer[E]) Produce(ctx context.Context, event E, stream string) error

Produce method for push message to redis stream.

With default converter, redis message will be like:

  • "xadd" "mystream" "*" "body" "{\"Message\":\"message\",\"Name\":\"name\",\"Foo\":712,\"Bar\":947}"
Example
type ProducerEvent struct {
	Foo string
	Bar int
}

producer := goxstreams.NewProducer[ProducerEvent](redis.NewClient(&redis.Options{Addr: "localhost:6379"}))

_ = producer.Produce(context.Background(), ProducerEvent{"foo", 1}, "mystream")
Output:

type RedisBrokenMessage added in v0.1.0

type RedisBrokenMessage struct {
	ID         string
	RetryCount int
	Body       map[string]interface{}
	Error      error
}

RedisBrokenMessage transmite to Worker.ProcessBroken method. Contains eventbody and addititional info.

type RedisClient

type RedisClient interface {
	redis.Cmdable
}

RedisClient required to use cluster client

type RedisMessage added in v0.1.0

type RedisMessage[E any] struct {
	ID         string
	RetryCount int
	Body       E
}

RedisMessage transmite to Worker.Process and Worker.ProcessDead method. Contains eventbody and addititional info.

type Worker

type Worker[E any] interface {
	Process(event RedisMessage[E]) error
	ProcessBroken(event RedisBrokenMessage) error
	ProcessDead(event RedisMessage[E]) error
}

Worker is an interface for processing messages from redis stream.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL