redisq

package
v0.3.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Aug 11, 2025 License: MIT Imports: 15 Imported by: 0

README

redisq

Package redisq provides a Redis Stream based message queue.

Max length of stream

The JSON object below is 360 characters long when serialized using JSON.stringify().
Assume each message is 0.5 KB. For 100 streams, this amounts to approximately 50 KB.
With 10,000 messages per stream, the total size would be around 500 MB.

{
    "glossary": {
        "title": "example glossary",
        "GlossDiv": {
            "title": "S",
            "GlossList": {
                "GlossEntry": {
                    "ID": "SGML",
                    "SortAs": "SGML",
                    "GlossTerm": "Standard Generalized Markup Language",
                    "Acronym": "SGML",
                    "Abbrev": "ISO 8879:1986",
                    "GlossDef": {
                        "para": "A meta-markup language, used to create markup languages such as DocBook.",
                        "GlossSeeAlso": ["GML", "XML"]
                    },
                    "GlossSee": "markup"
                }
            }
        }
    }
}

Documentation

Overview

Package redisq provides a Redis Stream based message queue.

Index

Constants

View Source
const (
	MIMEJSON = "application/json"
)

Variables

View Source
var (
	MaxLen int64 = 10000 // See the README about details
)
View Source
var Panicked = errors.New("panicked")

Panicked is a sentinel error for panics.

Functions

func MustAddBatchHandler

func MustAddBatchHandler[T any](
	c *Consumer,
	r *Route,
	h func(ctx Context, ms []*M[T]) error,
)

func MustAddHandler

func MustAddHandler[T any](
	c *Consumer,
	r *Route,
	h func(ctx Context, m *M[T]) error,
)

func Publish

func Publish[T any](ctx context.Context, rdb *redis.Client, stream string, m *M[T]) error

Publish publishes a new message to the given stream.

Types

type Consumer

type Consumer struct {
	// contains filtered or unexported fields
}

Consumer is Redis Stream based message queue.

func NewConsumer

func NewConsumer(c *redis.Client, l *slog.Logger, opts ...Option) (res *Consumer)

func (*Consumer) MustAddRoute

func (c *Consumer) MustAddRoute(r *Route)

func (*Consumer) Start

func (c *Consumer) Start(ctx context.Context) error

func (*Consumer) Stop

func (c *Consumer) Stop(ctx context.Context) error

type Context

type Context interface {
	context.Context
	WithContext(context.Context) Context
	Route() Route
	// IsBatch returns true if the route is batch.
	IsBatch() bool
	// Msg returns the first message in the current context.
	Msg() RM
	// Msgs returns all messages in the current context.
	Msgs() []RM
	// Ack acknowledge the messages.
	// If no IDs are provided, all messages will be acknowledged when error is nil.
	Ack(ids ...string)
	// contains filtered or unexported methods
}

type Handler

type Handler func(ctx Context) error

type M

type M[T any] struct {
	queue.M
	T T
}

func NewM

func NewM[T any](t T) *M[T]

NewM creates a new default message.

type Middleware

type Middleware func(Handler) Handler

func Chain

func Chain(m ...Middleware) Middleware

Chain creates a single Middleware by chaining multiple Middleware functions.

func Recover

func Recover(l *slog.Logger) Middleware

func Tracing

func Tracing() Middleware

type Option

type Option func(*Consumer)

func WithMiddlewares

func WithMiddlewares(mws ...Middleware) Option

type RM

type RM struct {
	ID     string
	Values map[string]any
}

RM is the raw Redis message.

func (RM) GetStr

func (m RM) GetStr(key string) string

type Route

type Route struct {
	Stream    string
	Group     string
	PendingID string  // The start ID for pending messages, default is "0"
	Handler   Handler // Handler is the message handler
	NoPending bool    // NoPending ignores the pending messages
	BatchSize int64   // BatchSize specifies the number of messages fetched per batch
	MaxLen    int64   // MaxLen specifies the max length of current stream
}

func (*Route) SpanName

func (r *Route) SpanName() string

SpanName is the name of the span for tracing.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL