Documentation
¶
Index ¶
Constants ¶
This section is empty.
Variables ¶
var DefaultConsumerGroupOptions = &ConsumerGroupOptions{ BlockDuration: time.Millisecond * 10, MessagesBufferSize: 100, InitialOffset: "0", ReclaimPendingMessagesInterval: time.Second * 5, ReturnErrors: false, }
DefaultConsumerGroupOptions is the default options Topic and GroupID is empty and should be set on usage
var DefaultProducerOptions = &ProducerOptions{ StreamLength: 10000, ExactLength: false, CreateStreamIfNotExists: true, }
DefaultProducerOptions is the default producer options
Functions ¶
This section is empty.
Types ¶
type ConsumerGroup ¶
type ConsumerGroup struct {
// contains filtered or unexported fields
}
ConsumerGroup is a consumer group of redis
func NewConsumerGroup ¶
func NewConsumerGroup(rdb *redis.Client, opts ...*ConsumerGroupOptions) (*ConsumerGroup, error)
NewConsumerGroup creates a new consumer group It uses DefaultConsumerGroupOptions when not any opts provided
func (*ConsumerGroup) Ack ¶
func (c *ConsumerGroup) Ack(ctx context.Context, m Message) error
Ack acknowledges message Every message should be acknowledged after processing To avoid double process
func (*ConsumerGroup) Close ¶
func (c *ConsumerGroup) Close() error
Close closes the consumer group and its internal channels
func (*ConsumerGroup) Consume ¶
func (c *ConsumerGroup) Consume(ctx context.Context) (<-chan Message, error)
func (*ConsumerGroup) Errors ¶
func (c *ConsumerGroup) Errors() <-chan error
type ConsumerGroupOptions ¶
type ConsumerGroupOptions struct { // GroupID is the name of the consumer group GroupID string // Topic is the name of the topic Topic string // BlockDuration is the duration to wait for new messages // Default: 1 second BlockDuration time.Duration // MessagesBufferSize is the size of the messages buffer // Default: 100 MessagesBufferSize int // InitialOffset is the initial offset of the consumer group InitialOffset string // ReclaimPendingMessagesInterval is the interval to reclaim pending messages ReclaimPendingMessagesInterval time.Duration // ReturnErrors is the flag to return errors // Default: false // If true, errors will be returned in the Errors() channel // and should be handled by the user to avoid deadlock ReturnErrors bool }
ConsumerGroupOptions contains options for the consumer group
type Producer ¶
type Producer struct {
// contains filtered or unexported fields
}
Producer is a redis stream producer
func (*Producer) Produce ¶
func (p *Producer) Produce(ctx context.Context, topic string, payload []byte, opts ...*ProducerOptions) error
Produce produces a new message to the specified topic When opts is not provided DefaultProducerOptions is used by default To bring your options use the ProducerOptions struct
type ProducerOptions ¶
type ProducerOptions struct { // StreamLength is the max length of the stream StreamLength int // ExactLength specifies if the stream length should be exact ExactLength bool // CreateStreamIfNotExists specifies if the stream should be created if it doesn't exist CreateStreamIfNotExists bool }
ProducerOptions contains options for the producer