Documentation
¶
Overview ¶
Package goxstreams lets you post and processes messages asynchronously using Redis Streams
Index ¶
- type Consumer
- type ConsumerConfig
- type Producer
- func (p Producer[E]) Produce(ctx context.Context, event E, stream string) error
- func (p Producer[E]) ProduceBatch(ctx context.Context, events []E, stream string) error
- func (p Producer[E]) WithConverter(convertFrom func(event *E) (map[string]interface{}, error)) Producer[E]
- func (p Producer[E]) WithCtxTransmmiter(transmitCtx func(ctx context.Context) (string, map[string]string)) Producer[E]
- type RedisBrokenMessage
- type RedisClient
- type RedisMessage
- type Worker
Examples ¶
Constants ¶
This section is empty.
Variables ¶
This section is empty.
Functions ¶
This section is empty.
Types ¶
type Consumer ¶
type Consumer[E any] struct { // contains filtered or unexported fields }
Consumer is a wrapper to easily getting messages from redis stream.
func NewConsumer ¶
func NewConsumer[E any]( ctx context.Context, client RedisClient, worker Worker[E], config ConsumerConfig, ) (Consumer[E], error)
NewConsumer is a constructor Consumer struct.
func (Consumer[E]) Run ¶
Run is a method to start processing messages from redis stream.
This method will start two processes: xreadgroup and xpending + xclaim. To stop - just cancel the context
func (Consumer[E]) WithConverter ¶ added in v0.3.0
func (c Consumer[E]) WithConverter( convertTo func(event map[string]interface{}) (*E, error), ) Consumer[E]
WithConverter is a Consumer's method for custom data converting.
Since Redis Streams messages are limited to a flat structure, we have 2 options available:
- flat Example: ("foo_key", "foo_val", "bar_key", "bar_val");
- nested json or proto into one key ("key", "{"foobar": {"foo_key": "foo_val", "bar_key": "bar_val"}}")
- or combination ("foo_key", "foo_val", "foobar", "{"foobar": {"foo_key": "foo_val", "bar_key": "bar_val"}}")
func (Consumer[E]) WithCtxReceiver ¶ added in v0.3.0
func (c Consumer[E]) WithCtxReceiver( receiveCtx func(ctx context.Context, event map[string]interface{}) context.Context, ) Consumer[E]
WithCtxReceiver is a method for receiving context fields
Looks like "ctx_field" "{"any_info":"info", "trace_id": "my_trace_id"}" in redis event
type ConsumerConfig ¶
type ConsumerConfig struct { Stream string Group string ConsumerName string BatchSize int64 // Default: 10 MaxConcurrency int64 // Default: 20 NoAck bool // Default: false MaxRetries int64 // Default: 0 CleaneUp bool // Default: false ReadInterval time.Duration // Default: 1 second FailIdle time.Duration // Default: 1 minute }
ConsumerConfig is configuration set for consumer work
Stream: name of the stream where we read it from Group: each group processes messages independently of the other ConsumerName: client name in the group, may not be unique BatchSize: (optional) the size of messages read from the stream per request MaxConcurrency: (optional) maximum number of message processing goroutines NoAck: (optional) when true - messages will not be reprocessed if there was an error MaxRetries: (optional) the number of times the message will be reprocessed on errors CleaneUp: (optional) automatic deletion of messages after successful processing FailReadTime: (optional) Failed messages read interval FailIdle: (optional) The time after which the message will be considered corrupted Example: ConsumerConfig{ Stream: "mystream", Group: "mygroup", ConsumerName: "consumer", BatchSize: 100, MaxConcurrency: 50, MaxRetries: 3, }
type Producer ¶
type Producer[E any] struct { // contains filtered or unexported fields }
Producer is a wrapper to easily produce messages to redis stream.
func NewProducer ¶
func NewProducer[E any](client RedisClient) Producer[E]
NewProducer is a constructor Producer struct.
func (Producer[E]) Produce ¶
Produce method for push message to redis stream.
With default converter, redis message will be like:
- "xadd" "mystream" "*" "body" "{\"Message\":\"message\",\"Name\":\"name\",\"Foo\":712,\"Bar\":947}"
Example ¶
type ProducerEvent struct { Foo string Bar int } producer := goxstreams.NewProducer[ProducerEvent](redis.NewClient(&redis.Options{Addr: "localhost:6379"})) _ = producer.Produce(context.Background(), ProducerEvent{"foo", 1}, "mystream")
Output:
func (Producer[E]) ProduceBatch ¶ added in v0.2.1
func (Producer[E]) WithConverter ¶ added in v0.3.0
func (p Producer[E]) WithConverter(convertFrom func(event *E) (map[string]interface{}, error)) Producer[E]
WithConverter is a Producer's method for custom data converting.
Since Redis Streams messages are limited to a flat structure, we have 2 options available:
- flat Example: ("foo_key", "foo_val", "bar_key", "bar_val");
- nested json or proto into one key ("key", "{"foobar": {"foo_key": "foo_val", "bar_key": "bar_val"}}")
- or combination ("foo_key", "foo_val", "foobar", "{"foobar": {"foo_key": "foo_val", "bar_key": "bar_val"}}")
func (Producer[E]) WithCtxTransmmiter ¶ added in v0.3.0
func (p Producer[E]) WithCtxTransmmiter(transmitCtx func(ctx context.Context) (string, map[string]string)) Producer[E]
WithCtxTransmmiter is a method for passing context fields
Looks like "ctx_field" "{"any_info":"info", "trace_id": "my_trace_id"}" in redis event
type RedisBrokenMessage ¶ added in v0.1.0
RedisBrokenMessage transmite to Worker.ProcessBroken method. Contains eventbody and addititional info.
type RedisClient ¶
type RedisClient interface { redis.StreamCmdable redis.Cmdable }
RedisClient required to use cluster client
type RedisMessage ¶ added in v0.1.0
RedisMessage transmite to Worker.Process and Worker.ProcessDead method. Contains eventbody and addititional info.
type Worker ¶
type Worker[E any] interface { Process(ctx context.Context, event RedisMessage[E]) error ProcessBroken(ctx context.Context, event RedisBrokenMessage) error ProcessDead(ctx context.Context, event RedisMessage[E]) error }
Worker is an interface for processing messages from redis stream.