mq

package
v0.0.0-...-8c522c3 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: May 14, 2026 License: Apache-2.0 Imports: 12 Imported by: 0

Documentation

Overview

Package mq provides a lightweight Redis Streams based message queue wrapper built on top of go-redis/v9 consumer groups.

Example (PublishAndConsume)
package main

import (
	"context"
	"fmt"
	"hash/fnv"
	"time"

	"github.com/alicebob/miniredis/v2"
	"github.com/redis/go-redis/v9"

	"github.com/codesjoy/pkg/basic/xredis/mq"
)

func main() {
	mr, err := miniredis.Run()
	if err != nil {
		panic(err)
	}
	defer mr.Close()

	client := redis.NewClient(&redis.Options{Addr: mr.Addr()})
	defer client.Close()

	publisher, err := mq.NewPublisher(client, mq.PublisherConfig{DefaultStream: "jobs"})
	if err != nil {
		panic(err)
	}
	consumer, err := mq.NewConsumer(client, mq.ConsumerConfig{
		Stream:            "jobs",
		Group:             "workers",
		Consumer:          "worker-1",
		AutoCreateGroup:   true,
		OrderedShardCount: 4,
		OwnedShards:       []int{mqTestShard("user-42", 4)},
		Block:             10 * time.Millisecond,
		IdleBackoff:       5 * time.Millisecond,
		AutoClaimMinIdle:  time.Second,
	})
	if err != nil {
		panic(err)
	}

	publisher, err = mq.NewPublisher(client, mq.PublisherConfig{
		DefaultStream:     "jobs",
		OrderedShardCount: 4,
	})
	if err != nil {
		panic(err)
	}

	if _, err := publisher.Publish(context.Background(), &mq.Message{
		Payload: []byte("send-email"),
		Headers: map[string]string{
			"kind":        "welcome",
			"x-order-key": "user-42",
		},
	}); err != nil {
		panic(err)
	}

	_ = consumer.Consume(
		context.Background(),
		func(_ context.Context, msg *mq.MessageContext) error {
			fmt.Printf("%s %s\n", msg.Message.Payload, msg.Message.Headers["kind"])
			return consumer.Close()
		},
	)

}

func mqTestShard(key string, shardCount int) int {
	h := fnv.New32a()
	_, _ = h.Write([]byte(key))
	return int(h.Sum32() % uint32(shardCount))
}
Output:
send-email welcome

Index

Examples

Constants

This section is empty.

Variables

View Source
var (
	// ErrNilClient indicates the redis client is nil.
	ErrNilClient = errors.New("mq redis client is nil")
	// ErrNilPublisher indicates the publisher receiver is nil.
	ErrNilPublisher = errors.New("mq publisher is nil")
	// ErrNilConsumer indicates the consumer receiver is nil.
	ErrNilConsumer = errors.New("mq consumer is nil")
	// ErrNilMessage indicates the message is nil.
	ErrNilMessage = errors.New("mq message is nil")
	// ErrNilHandlerFunc indicates the final consumer handler is nil.
	ErrNilHandlerFunc = errors.New("mq handler is nil")
	// ErrMessageStreamRequired indicates the message stream is empty.
	ErrMessageStreamRequired = errors.New("mq message stream is required")
	// ErrConsumerStreamRequired indicates the consumer stream is empty.
	ErrConsumerStreamRequired = errors.New("mq consumer stream is required")
	// ErrConsumerGroupRequired indicates the consumer group is empty.
	ErrConsumerGroupRequired = errors.New("mq consumer group is required")
	// ErrConsumerNameRequired indicates the consumer name is empty.
	ErrConsumerNameRequired = errors.New("mq consumer name is required")
	// ErrConsumerClosed indicates the consumer has already been closed.
	ErrConsumerClosed = errors.New("mq consumer is closed")
	// ErrConsumerActive indicates the consumer is already running.
	ErrConsumerActive = errors.New("mq consumer is already running")
)

Functions

This section is empty.

Types

type Consumer

type Consumer struct {
	// contains filtered or unexported fields
}

Consumer wraps one Redis Streams consumer group loop.

func NewConsumer

func NewConsumer(client redis.UniversalClient, cfg ConsumerConfig) (*Consumer, error)

NewConsumer creates a configured consumer wrapper.

func (*Consumer) Close

func (c *Consumer) Close() error

Close marks the consumer as closed and stops future Consume calls.

func (*Consumer) Consume

func (c *Consumer) Consume(ctx context.Context, handler HandlerFunc) error

Consume starts consuming until ctx is done, Close is called, or handler returns an error.

type ConsumerConfig

type ConsumerConfig struct {
	Stream            string
	Group             string
	Consumer          string
	Block             time.Duration
	Count             int64
	AutoCreateGroup   bool
	GroupStartID      string
	PayloadField      string
	HeaderPrefix      string
	OrderKeyHeader    string
	ShardCount        int
	ShardQueueSize    int
	OrderedShardCount int
	ShardStreamPrefix string
	OwnedShards       []int
	AutoClaimMinIdle  time.Duration
	AutoClaimCount    int64
	IdleBackoff       time.Duration
}

ConsumerConfig configures Consumer.

func (*ConsumerConfig) Validate

func (cfg *ConsumerConfig) Validate() error

Validate validates and normalizes consumer config.

type HandlerFunc

type HandlerFunc func(context.Context, *MessageContext) error

HandlerFunc handles one consumed message.

type Message

type Message struct {
	Stream  string
	Payload []byte
	Headers map[string]string
}

Message is one logical Redis Streams message.

type MessageContext

type MessageContext struct {
	Message       *Message
	BaseStream    string
	ShardStream   string
	Stream        string
	ID            string
	Group         string
	Consumer      string
	LogicalKey    string
	Shard         int
	DeliveryCount int64
	Claimed       bool
	ReceivedAt    time.Time
}

MessageContext contains per-message metadata passed to the business handler.

type PublishResult

type PublishResult struct {
	BaseStream string
	Stream     string
	Shard      int
	ID         string
	Published  time.Time
}

PublishResult is one successful publish result.

type Publisher

type Publisher struct {
	// contains filtered or unexported fields
}

Publisher wraps Redis Streams publish helpers.

func NewPublisher

func NewPublisher(client redis.UniversalClient, cfg PublisherConfig) (*Publisher, error)

NewPublisher creates a configured publisher wrapper.

func (*Publisher) Publish

func (p *Publisher) Publish(ctx context.Context, msg *Message) (*PublishResult, error)

Publish sends one message synchronously.

func (*Publisher) PublishBatch

func (p *Publisher) PublishBatch(
	ctx context.Context,
	msgs ...*Message,
) ([]*PublishResult, error)

PublishBatch sends messages sequentially and fails fast on first error.

type PublisherConfig

type PublisherConfig struct {
	DefaultStream     string
	PayloadField      string
	HeaderPrefix      string
	OrderKeyHeader    string
	OrderedShardCount int
	ShardStreamPrefix string
}

PublisherConfig configures Publisher.

func (*PublisherConfig) Validate

func (cfg *PublisherConfig) Validate() error

Validate validates and normalizes publisher config.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL