Documentation
¶
Overview ¶
Package mq provides a lightweight Redis Streams based message queue wrapper built on top of go-redis/v9 consumer groups.
Example (PublishAndConsume) ¶
package main
import (
"context"
"fmt"
"hash/fnv"
"time"
"github.com/alicebob/miniredis/v2"
"github.com/redis/go-redis/v9"
"github.com/codesjoy/pkg/basic/xredis/mq"
)
func main() {
mr, err := miniredis.Run()
if err != nil {
panic(err)
}
defer mr.Close()
client := redis.NewClient(&redis.Options{Addr: mr.Addr()})
defer client.Close()
publisher, err := mq.NewPublisher(client, mq.PublisherConfig{DefaultStream: "jobs"})
if err != nil {
panic(err)
}
consumer, err := mq.NewConsumer(client, mq.ConsumerConfig{
Stream: "jobs",
Group: "workers",
Consumer: "worker-1",
AutoCreateGroup: true,
OrderedShardCount: 4,
OwnedShards: []int{mqTestShard("user-42", 4)},
Block: 10 * time.Millisecond,
IdleBackoff: 5 * time.Millisecond,
AutoClaimMinIdle: time.Second,
})
if err != nil {
panic(err)
}
publisher, err = mq.NewPublisher(client, mq.PublisherConfig{
DefaultStream: "jobs",
OrderedShardCount: 4,
})
if err != nil {
panic(err)
}
if _, err := publisher.Publish(context.Background(), &mq.Message{
Payload: []byte("send-email"),
Headers: map[string]string{
"kind": "welcome",
"x-order-key": "user-42",
},
}); err != nil {
panic(err)
}
_ = consumer.Consume(
context.Background(),
func(_ context.Context, msg *mq.MessageContext) error {
fmt.Printf("%s %s\n", msg.Message.Payload, msg.Message.Headers["kind"])
return consumer.Close()
},
)
}
func mqTestShard(key string, shardCount int) int {
h := fnv.New32a()
_, _ = h.Write([]byte(key))
return int(h.Sum32() % uint32(shardCount))
}
Output: send-email welcome
Index ¶
Examples ¶
Constants ¶
This section is empty.
Variables ¶
var ( // ErrNilClient indicates the redis client is nil. ErrNilClient = errors.New("mq redis client is nil") // ErrNilPublisher indicates the publisher receiver is nil. ErrNilPublisher = errors.New("mq publisher is nil") // ErrNilConsumer indicates the consumer receiver is nil. ErrNilConsumer = errors.New("mq consumer is nil") // ErrNilMessage indicates the message is nil. ErrNilMessage = errors.New("mq message is nil") // ErrNilHandlerFunc indicates the final consumer handler is nil. ErrNilHandlerFunc = errors.New("mq handler is nil") // ErrMessageStreamRequired indicates the message stream is empty. ErrMessageStreamRequired = errors.New("mq message stream is required") // ErrConsumerStreamRequired indicates the consumer stream is empty. ErrConsumerStreamRequired = errors.New("mq consumer stream is required") // ErrConsumerGroupRequired indicates the consumer group is empty. ErrConsumerGroupRequired = errors.New("mq consumer group is required") // ErrConsumerNameRequired indicates the consumer name is empty. ErrConsumerNameRequired = errors.New("mq consumer name is required") // ErrConsumerClosed indicates the consumer has already been closed. ErrConsumerClosed = errors.New("mq consumer is closed") // ErrConsumerActive indicates the consumer is already running. ErrConsumerActive = errors.New("mq consumer is already running") )
Functions ¶
This section is empty.
Types ¶
type Consumer ¶
type Consumer struct {
// contains filtered or unexported fields
}
Consumer wraps one Redis Streams consumer group loop.
func NewConsumer ¶
func NewConsumer(client redis.UniversalClient, cfg ConsumerConfig) (*Consumer, error)
NewConsumer creates a configured consumer wrapper.
type ConsumerConfig ¶
type ConsumerConfig struct {
Stream string
Group string
Consumer string
Block time.Duration
Count int64
AutoCreateGroup bool
GroupStartID string
PayloadField string
HeaderPrefix string
OrderKeyHeader string
ShardCount int
ShardQueueSize int
OrderedShardCount int
ShardStreamPrefix string
OwnedShards []int
AutoClaimMinIdle time.Duration
AutoClaimCount int64
IdleBackoff time.Duration
}
ConsumerConfig configures Consumer.
func (*ConsumerConfig) Validate ¶
func (cfg *ConsumerConfig) Validate() error
Validate validates and normalizes consumer config.
type HandlerFunc ¶
type HandlerFunc func(context.Context, *MessageContext) error
HandlerFunc handles one consumed message.
type MessageContext ¶
type MessageContext struct {
Message *Message
BaseStream string
ShardStream string
Stream string
ID string
Group string
Consumer string
LogicalKey string
Shard int
DeliveryCount int64
Claimed bool
ReceivedAt time.Time
}
MessageContext contains per-message metadata passed to the business handler.
type PublishResult ¶
type PublishResult struct {
BaseStream string
Stream string
Shard int
ID string
Published time.Time
}
PublishResult is one successful publish result.
type Publisher ¶
type Publisher struct {
// contains filtered or unexported fields
}
Publisher wraps Redis Streams publish helpers.
func NewPublisher ¶
func NewPublisher(client redis.UniversalClient, cfg PublisherConfig) (*Publisher, error)
NewPublisher creates a configured publisher wrapper.
func (*Publisher) PublishBatch ¶
func (p *Publisher) PublishBatch( ctx context.Context, msgs ...*Message, ) ([]*PublishResult, error)
PublishBatch sends messages sequentially and fails fast on first error.
type PublisherConfig ¶
type PublisherConfig struct {
DefaultStream string
PayloadField string
HeaderPrefix string
OrderKeyHeader string
OrderedShardCount int
ShardStreamPrefix string
}
PublisherConfig configures Publisher.
func (*PublisherConfig) Validate ¶
func (cfg *PublisherConfig) Validate() error
Validate validates and normalizes publisher config.