segmenter

package module
v0.1.1 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Mar 28, 2022 License: Apache-2.0 Imports: 16 Imported by: 0

README

segmenter

codecov GoDoc

Package segmenter implements partition over redis streams

Redis streams are great, fast and easy to use. But sometimes we want to ensure ordering in processing of messages This library guarantees that all the messages based on a partition key are processed in order on a single consumer It also allows automatic re-balancing i.e. if a consumer is added/removed(dies) then the partitions are rebalanced and the ordering property is followed

Segmenter

This is how the segmenter is initialized. Namespace is the logical container for segmenter. You can initialize multiple segmenter in same app with different namespaces or in diff apps with same namespaces (given underlying redis connection is same). Once initialized segmenter can be used to register streams and consumers.

c := segmenter.Config{
	RedisOptions: &redis.Options{Addr: "localhost:6379"},
	NameSpace:    "namespace",
	Debug:        false,
}
s, err := segmenter.NewSegmenter(&c)
Stream

Stream can be registered by any other app as well given the segmenter namespace and redis backend is same. This enable some of your apps to just act as producer while some to just act as consumers.

//Register a stream with name segmenter, 2 partitions and partition size 150
st, err := s.RegisterStream(ctx, "segmenter", 2, 250)
if err != nil {
	log.Fatalf("RegisterStream(), err = %v", err)
}

After registering you can use this stream to send messages

// It sends 10 messages to the stream which will be divided across two partitions of the stream
for i := 0; i < 10; i++ {
	uuid := fmt.Sprintf("uuid_%d", rand.Intn(1000))
	_, _ = st.Send(context.TODO(), &contracts.PMessage{
		Data:         []byte(fmt.Sprintf("Message with uuid : %s", uuid)),
		PartitionKey: uuid,
	})
}
Consumer

Similar to the stream you can register the consumer using the segmenter. A stream should be registered with the segmenter before you register the consumer. if not you will get NonExistentStream error

// Here we are registering a consumer
// stream : "segmenter"
// group : "group1"
// batchSize : 10 (similar to redis count)
// maxProcessingTime : 1 sec. If the message is not acked in 1 sec we will ry to redeliver it
c, err := seg.RegisterConsumer(ctx, "segmenter", "group1", 10, time.Second)
if err != nil {
	log.Fatalf("Consumer1 : registerConsumer() err = %v", err)
}

Once you have the message you can ack the message so that it will be marked processed

err := c1.Ack(ctx, m)
log.Printf("Consumer1 : registerConsumer() err = %v", err)

You can shut down the consumer using the sutDown method. This will cause the partitions to rebalance

err = c2.ShutDown()
if err != nil {
	log.Fatalf("Error happened while shutting down c2, %v", err)
}

For more details you can check out the tests/e2e package. It contains end-to-end tests which explains these in more detail

Documentation

Overview

Package segmenter implements partition over redis streams

Redis streams are great, fast and easy to use. But sometimes we want to ensure ordering in processing of messages This library guarantees that all the messages based on a partition key are processed in order on a single consumer It also allows does re-balancing i.e. if a consumer is added/removed(dies) then the partitions are rebalanced and the ordering property is followed

Segmenter

This is how the segmenter is initialized. Namespace is the logical container for segmenter. You can initialize multiple segmenter in same app with different namespaces or in diff apps with same namespaces (given underlying redis connection is same). Once initialized segmenter can be used to register streams and consumers.

c := segmenter.Config{
	RedisOptions: &redis.Options{Addr: "localhost:6379"},
	NameSpace:    "namespace",
	Debug:        false,
}
s, err := segmenter.NewSegmenter(&c)

Stream

Stream can be registered by any other app as well given the segmenter namespace and redis backend is same. This enable some of your apps to just act as producer while some to just act as consumers.

//Register a stream with name segmenter, 2 partitions and partition size 150
st, err := s.RegisterStream(ctx, "segmenter", 2, 250)
if err != nil {
	log.Fatalf("RegisterStream(), err = %v", err)
}

After registering you can use this stream to send messages

// It sends 10 messages to the stream which will be divided across two partitions of the stream
for i := 0; i < 10; i++ {
	uuid := fmt.Sprintf("uuid_%d", rand.Intn(1000))
	_, _ = st.Send(context.TODO(), &contracts.PMessage{
		Data:         []byte(fmt.Sprintf("Message with uuid : %s", uuid)),
		PartitionKey: uuid,
	})
}

Consumer

Similar to the stream you can register the consumer using the segmenter. A stream should be registered with the segmenter before you register the consumer. if not you will get ErrorNonExistentStream error

// Here we are registering a consumer
// stream : "segmenter"
// group : "group1"
// batchSize : 10 (similar to redis count)
// maxProcessingTime : 1 sec. If the message is not acked in 1 sec we will ry to redeliver it
c, err := seg.RegisterConsumer(ctx, "segmenter", "group1", 10, time.Second)
if err != nil {
	log.Fatalf("Consumer1 : registerConsumer() err = %v", err)
}

Once you have the message you can ack the message so that it will be marked processed

err := c1.Ack(ctx, m)
log.Printf("Consumer1 : registerConsumer() err = %v", err)

You can shut down the consumer using the sutDown method. This will cause the partitions to rebalance

err = c2.ShutDown()
if err != nil {
	log.Fatalf("Error happened while shutting down c2, %v", err)
}

For more details you can check out the tests/e2e package. I contains end to end test which explains these in more detail

Index

Constants

This section is empty.

Variables

View Source
var ConsumerDeadError = errors.New("consumer shut down")
View Source
var ErrorEmptyGroupName = fmt.Errorf("group cannot be empty")
View Source
var ErrorEmptyStreamName = fmt.Errorf("stream name cannot be empty")
View Source
var ErrorInvalidBatchSize = fmt.Errorf("batch size cannot less than 1")
View Source
var ErrorInvalidPartitionCount = fmt.Errorf("partition count cannot less than 1")
View Source
var ErrorInvalidPartitionSize = fmt.Errorf("partition size cannot less than 1")
View Source
var ErrorNonExistentStream = fmt.Errorf("stream do not exist")

Functions

This section is empty.

Types

type Config

type Config struct {
	// RedisOptions : This is same as proved by golang redis/v8 package
	RedisOptions *redis.Options

	// NameSpace : All the redis in redis for this segmenter instance will be prefixed by __nameSpace
	NameSpace string

	// Debug : boolean flag to enabled debug logs in the segmenter
	Debug bool
}

Config : Config to initialize the segmenter

type Consumer

type Consumer struct {
	// contains filtered or unexported fields
}

Consumer : This is a segmenter consumer. When you register a consumer with segmenter you get an instance of the consumer. This will we used to read/ack messages in the stream

func (*Consumer) Ack

func (c *Consumer) Ack(ctx context.Context, cmessage *contracts.CMessage) error

Ack : Acknowledge messages in the redis stream

func (*Consumer) GetID

func (c *Consumer) GetID() string

GetID : Returns the unique id assigned to the consumer

func (*Consumer) GetNameSpace

func (c *Consumer) GetNameSpace() string

GetNameSpace : Returns the name space of registered consumer. This will always be the same as the namespace of the stream

func (*Consumer) GetStreamName

func (c *Consumer) GetStreamName() string

GetStreamName : Returns the name of the stream against which this consumer is registered

func (*Consumer) IsActive

func (c *Consumer) IsActive() bool

IsActive : Returns true if a consumer is active

func (*Consumer) Read

func (c *Consumer) Read(ctx context.Context, maxWaitDuration time.Duration) ([]*contracts.CMessage, error)

Read : Reads rom the stream this consumer is registered with. It will only read from the partitions assigned to the consumer.

func (*Consumer) ShutDown

func (c *Consumer) ShutDown() error

ShutDown : This will shut down the consumer. You will no longer be able to read or ack messages via this consumer. As an effect of this the partitions which were assigned to this consumer will be rebalanced and assigned to other consumers in the group

type Segmenter

type Segmenter struct {
	// contains filtered or unexported fields
}

Segmenter : Struck exposed by the library to register streams and consumer

func NewSegmenter

func NewSegmenter(c *Config) (*Segmenter, error)

func (*Segmenter) RegisterConsumer

func (s *Segmenter) RegisterConsumer(ctx context.Context, name string, group string, batchSize int64, maxProcessingTime time.Duration) (*Consumer, error)

RegisterConsumer : Registers a consumer with segmenter against a stream and group with given batchSize and processingTime

func (*Segmenter) RegisterStream

func (s *Segmenter) RegisterStream(ctx context.Context, name string, pcount int, psize int64) (*Stream, error)

RegisterStream : Registers a stream with segmenter with given partition count and partition size

type Stream

type Stream struct {
	// contains filtered or unexported fields
}

Stream : It represents a segmenter stream. You will use this to send messages to the stream. It will route your messages to appropriate partition based on the paritionKey embedded in the message

func (*Stream) GetName

func (s *Stream) GetName() string

GetName : Returns the name off stream

func (*Stream) GetPartitionCount

func (s *Stream) GetPartitionCount() int

GetPartitionCount : Returns partition count for the stream

func (*Stream) GetPartitionSize

func (s *Stream) GetPartitionSize() int64

GetPartitionSize : Returns the size of the partitions for the stream

func (*Stream) Send

func (s *Stream) Send(ctx context.Context, m *contracts.PMessage) (string, error)

Send : Send messages to the stream

Directories

Path Synopsis
api
internal
tests
e2e

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL