redistream

package module
v0.0.0-...-34ce376 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Aug 30, 2024 License: MIT Imports: 9 Imported by: 0

README

Redistream

Redistream is a wrapper on redis stream processing commands. It provides higher level api to work with streams.

Installation

go get github.com/morilog/redistream

Usage

Visit examples for full example of usage

Features

  • Simple Producer (redis itself it simple too)
  • Handle messages with golang native channels
  • Handle errors asynchronously
  • re-Claim and handle pending messages continuously
  • Preserve message ordering by using single consumer in consumer group

License

MIT

Documentation

Index

Constants

This section is empty.

Variables

View Source
var DefaultConsumerGroupOptions = &ConsumerGroupOptions{
	BlockDuration:                  time.Millisecond * 10,
	MessagesBufferSize:             100,
	InitialOffset:                  "0",
	ReclaimPendingMessagesInterval: time.Second * 5,
	ReturnErrors:                   false,
}

DefaultConsumerGroupOptions is the default options Topic and GroupID is empty and should be set on usage

View Source
var DefaultProducerOptions = &ProducerOptions{
	StreamLength:            10000,
	ExactLength:             false,
	CreateStreamIfNotExists: true,
}

DefaultProducerOptions is the default producer options

Functions

This section is empty.

Types

type ConsumerGroup

type ConsumerGroup struct {
	// contains filtered or unexported fields
}

ConsumerGroup is a consumer group of redis

func NewConsumerGroup

func NewConsumerGroup(rdb *redis.Client, opts ...*ConsumerGroupOptions) (*ConsumerGroup, error)

NewConsumerGroup creates a new consumer group It uses DefaultConsumerGroupOptions when not any opts provided

func (*ConsumerGroup) Ack

func (c *ConsumerGroup) Ack(ctx context.Context, m Message) error

Ack acknowledges message Every message should be acknowledged after processing To avoid double process

func (*ConsumerGroup) Close

func (c *ConsumerGroup) Close() error

Close closes the consumer group and its internal channels

func (*ConsumerGroup) Consume

func (c *ConsumerGroup) Consume(ctx context.Context) (<-chan Message, error)

func (*ConsumerGroup) Errors

func (c *ConsumerGroup) Errors() <-chan error

type ConsumerGroupOptions

type ConsumerGroupOptions struct {
	// GroupID is the name of the consumer group
	GroupID string
	// Topic is the name of the topic
	Topic string
	// BlockDuration is the duration to wait for new messages
	// Default: 1 second
	BlockDuration time.Duration
	// MessagesBufferSize is the size of the messages buffer
	// Default: 100
	MessagesBufferSize int
	// InitialOffset is the initial offset of the consumer group
	InitialOffset string
	// ReclaimPendingMessagesInterval is the interval to reclaim pending messages
	ReclaimPendingMessagesInterval time.Duration
	// ReturnErrors is the flag to return errors
	// Default: false
	// If true, errors will be returned in the Errors() channel
	// and should be handled by the user to avoid deadlock
	ReturnErrors bool
}

ConsumerGroupOptions contains options for the consumer group

type Message

type Message struct {
	Payload   []byte
	GroupID   string
	Topic     string
	ID        string
	Timestamp time.Time
}

Message represents each redis stream item

type Producer

type Producer struct {
	// contains filtered or unexported fields
}

Producer is a redis stream producer

func NewProducer

func NewProducer(rdb *redis.Client) *Producer

NewProducer creates a new producer

func (*Producer) Produce

func (p *Producer) Produce(ctx context.Context, topic string, payload []byte, opts ...*ProducerOptions) error

Produce produces a new message to the specified topic When opts is not provided DefaultProducerOptions is used by default To bring your options use the ProducerOptions struct

type ProducerOptions

type ProducerOptions struct {
	// StreamLength is the max length of the stream
	StreamLength int
	// ExactLength specifies if the stream length should be exact
	ExactLength bool
	// CreateStreamIfNotExists specifies if the stream should be created if it doesn't exist
	CreateStreamIfNotExists bool
}

ProducerOptions contains options for the producer

Directories

Path Synopsis

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL