redstream

package module
v1.1.4 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jan 19, 2025 License: MIT Imports: 17 Imported by: 0

README

Redis Stream

A Golang library for robust Redis Streams consumption with:

  1. Consumer Groups for once-per-message processing.
  2. Redsync locks to prevent duplicates across multiple workers.
  3. Auto-reclaim (XAUTOCLAIM) to recover stuck pending messages.
  4. Optional ephemeral “publish lock” to skip concurrent duplicates.
  5. Exponential backoff for repeated failures.
  6. Dead Letter Queue (DLQ) support for over-limit messages.
  7. Universal Client for single-node, cluster, or sentinel setups.
  8. Concurrency Limit to throttle simultaneous message handling.
  9. Automatic stale-consumer removal (clears out “dead” consumers to reclaim messages).
  10. Auto-rejoin if the current consumer was forcibly removed from the group.

Installation

go get github.com/smarter-day/redstream

Quick Usage Example

Below is a simplified demonstration:

package main

import (
    "context"
    "log"

    "github.com/redis/go-redis/v9"
    "github.com/smarter-day/redstream"
)

func main() {
    ctx := context.Background()

    cfg := redstream.Config{
        StreamName:         "myStream",
        GroupName:          "myGroup",
        ConsumerName:       "myConsumer",
        EnableReclaim:      true,
        ReclaimStr:         "5s",
        ReclaimCount:       10,
        MaxReclaimAttempts: 3,
        MaxConcurrency:     5, // optional concurrency limit
        // Optional DLQ handler for messages that exceed attempts
        DLQHandler: func(ctx context.Context, xMsg *redis.XMessage) error {
            log.Printf("[DLQ] message ID=%s\n", xMsg.ID)
            return nil
        },
        // (Optional) Auto-remove other stale consumers, rejoin if we're removed
        EnableStaleConsumerCleanup: true,
        EnableAutoRejoinOnRemoved:  true,
    }

    // For single node, cluster, or sentinel
    uniOpts := &redis.UniversalOptions{Addrs: []string{"localhost:6379"}}
    stream := redstream.New(uniOpts, cfg)

    // Register your message handler
    stream.RegisterHandler(func(ctx context.Context, fields map[string]string) error {
        // Return an error to test the backoff/reclaim. Otherwise, handle your data here.
        return nil
    })

    if err := stream.StartConsumer(ctx); err != nil {
        log.Fatal("Cannot start consumer:", err)
    }

    // Publish
    msgID, err := stream.Publish(ctx, map[string]any{"foo": "bar"})
    if err != nil {
        log.Println("Publish error:", err)
    } else {
        log.Println("Published message ID:", msgID)
    }

    // Block forever
    select {}
}

High-Level Flow

  1. Publish: Uses XADD. If DropConcurrentDuplicates=true, a short Redsync lock can skip near-simultaneous duplicates.
  2. Consume: Each message is read with XREADGROUP. On success => XACK + XDEL; on failure => attempts/backoff. If UseDistributedLock=true, only one node processes a given message at a time.
  3. Reclaim: A background loop calls XAUTOCLAIM to reclaim messages stuck in PEL. Retries until MaxReclaimAttempts.
  4. Stale Consumers: If EnableStaleConsumerCleanup=true, periodically removes consumer names that have been idle beyond StaleConsumerIdleThresholdStr, freeing their stuck messages for reclamation.
  5. Auto-Rejoin: If EnableAutoRejoinOnRemoved=true, a consumer that detects it’s been removed by another node automatically re-joins the group.
  6. DLQ: Messages exceeding MaxReclaimAttempts go to DLQHandler (if set). If IgnoreDLQHandlerErrors=false, it’ll remain pending on DLQ errors.
  7. Concurrency Limit: With MaxConcurrency>0, each consumer instance only processes that many messages simultaneously.

Configuration

Configure via redstream.Config plus *redis.UniversalOptions. Key fields include:

Field Default Description
StreamName required Redis stream name.
GroupName required Consumer group name (auto-created at 0-0 if missing).
ConsumerName auto-generated Must be unique across consumers.
EnableReclaim false Whether to run reclaim logic (XAUTOCLAIM).
ReclaimStr "5s" Frequency of auto-reclaim checks.
MaxReclaimAttempts 3 After this many fails, remove or DLQ the message.
DLQHandler nil Callback for messages that exceed MaxReclaimAttempts.
IgnoreDLQHandlerErrors false If true, remove message even if DLQ fails.
DropConcurrentDuplicates false Adds a brief Redsync lock on Publish to skip duplicates.
MaxConcurrency 10 If >0, concurrency is throttled in processMessage.
StaleConsumerIdleThresholdStr "2m" Consumers idle longer than this are considered “stale” (if EnableStaleConsumerCleanup=true).
EnableStaleConsumerCleanup false If true, we remove “stale” consumers, letting a healthy consumer reclaim their messages.
EnableAutoRejoinOnRemoved false If a consumer sees it was removed, it automatically re-joins the group.
AutoRejoinCheckIntervalStr "30s" Frequency for checking if our consumer name still exists in the group.
UseRedisIdAsUniqueID false If true, use the Redis XMessage.ID for dedup. Otherwise, compute sha256 from message payload.

Deeper Explanation & Best Practices

  • Universal Client:
    redis.NewUniversalClient(...) works for single-node, cluster, or sentinel modes.
  • Redsync Locks:
    With UseDistributedLock=true, each message is locked so only one node processes it at a time.
    With DropConcurrentDuplicates=true, Publish also uses a tiny ephemeral lock.
  • Auto-Reclaim:
    Recovers messages left in the Pending Entries List if a consumer crashed.
  • Exponential Backoff:
    Each re-failed message remains in PEL; we store a “nextBackoffSec” so we skip it until it’s ready again.
  • DLQ:
    Once a message passes MaxReclaimAttempts, it’s removed from the stream. Optionally, DLQHandler is called.
  • Stale Consumers:
    If a consumer remains idle beyond StaleConsumerIdleThresholdStr, we run XGROUP DELCONSUMER, letting other consumers reclaim messages.
    A forcibly removed consumer can rejoin if EnableAutoRejoinOnRemoved=true.
  • Fallback:
    If a Lua script fails (e.g. “NOSCRIPT”), you can do a forced XACK+XDEL to avoid duplicates stuck in pending.

License

MIT License – open for adaptation & improvement. Issues and contributions are always welcome!

Documentation

Index

Constants

View Source
const (
	RedisKeysPrefix = "redstream"
)

Variables

View Source
var (
	Validate *validator.Validate
)

Functions

func CreateGroupIfNotExists

func CreateGroupIfNotExists(client redis.Cmdable, stream, group string) error

CreateGroupIfNotExists attempts to create a consumer group for a Redis stream if it doesn't already exist. It uses the XGroupCreateMkStream command, which creates both the stream and the group if they don't exist.

Parameters:

  • client: A pointer to a redis.Client instance used to execute Redis commands.
  • stream: The name of the Redis stream to create or use.
  • group: The name of the consumer group to create.

Returns:

  • error: nil if the group was created successfully or already exists, otherwise returns an error.

func ParseDurationOrDefault

func ParseDurationOrDefault(s *string, def time.Duration) time.Duration

ParseDurationOrDefault attempts to parse a string into a time.Duration. If parsing fails or results in a non-positive duration, it returns the default value.

Parameters:

  • s: A string representation of a duration (e.g., "5s", "1m30s").
  • def: The default duration to return if parsing fails or results in a non-positive value.

Returns:

A time.Duration parsed from the input string, or the default value if parsing fails.

func UniqueConsumerName

func UniqueConsumerName(base string) string

UniqueConsumerName generates a unique consumer name for use in Redis streams. It combines a base name with the hostname, process ID, and a random suffix to ensure uniqueness across different instances and executions.

Parameters:

  • base: A string that serves as the prefix for the generated consumer name.

Returns:

A string representing a unique consumer name in the format:
"<base>-<hostname>-<pid>-<random_suffix>"

func ValidateDuration

func ValidateDuration(fl validator.FieldLevel) bool

ValidateDuration checks if a given field can be parsed as a valid duration.

It takes a validator.FieldLevel as an argument, which provides access to the field being validated and its metadata.

Parameters:

  • fl: A validator.FieldLevel interface that allows access to the field being validated, as well as the struct it belongs to.

Returns:

  • bool: true if the field can be parsed as a valid duration, false otherwise.

Types

type Config

type Config struct {
	StreamName   string `validate:"required"`
	GroupName    string `validate:"required"`
	ConsumerName string `validate:"required"`

	MaxConcurrency      int
	UseDistributedLock  bool // if false => skip the redsnyc-based lock
	NoProgressThreshold int  // after how many consecutive zero-reclaims do we call readPendingMessagesOnce

	LockExpiryStr    string `validate:"duration"`
	LockExtendStr    string `validate:"duration"`
	BlockDurationStr string `validate:"duration"`

	EnableReclaim               bool
	ReclaimStr                  string `validate:"duration"`
	CleanOldReclaimDuration     string `validate:"duration"`
	MaxReclaimAttempts          int
	ReclaimCount                int64
	ReclaimMaxExponentialFactor int

	DLQHandler               DLQHandlerFn
	IgnoreDLQHandlerErrors   bool
	DropConcurrentDuplicates bool

	ProcessedIdsMaxAgeStr string `validate:"duration"`
	UseRedisIdAsUniqueID  bool

	StaleConsumerIdleThresholdStr string `validate:"duration"`
	EnableStaleConsumerCleanup    bool

	EnableAutoRejoinOnRemoved  bool
	AutoRejoinCheckIntervalStr string `validate:"duration"`
}

Config represents the configuration for the RedStream.

type DLQHandlerFn

type DLQHandlerFn func(ctx context.Context, msg *redis.XMessage) error

type IRedStream

type IRedStream interface {
	StartConsumer(ctx context.Context) error
	RegisterHandler(handler func(ctx context.Context, msg map[string]string) error)
	Publish(ctx context.Context, data any) (string, error)
	HealthCheck(ctx context.Context) error

	UseDebug(fn LogFn)
	UseInfo(fn LogFn)
	UseError(fn LogFn)
}

func New

func New(redisOptions *redis.UniversalOptions, cfg Config) IRedStream

New creates and initializes a new IRedStream instance.

It sets up the Redis client, configures distributed locking if enabled, parses and validates configuration values, creates the consumer group if it doesn't exist, and registers Lua scripts.

Parameters:

  • redisOptions: A pointer to redis.UniversalOptions containing Redis connection settings.
  • cfg: A Config struct containing the configuration for the RedStream.

Returns:

  • An IRedStream interface implementation. If there's an error during initialization, it returns nil and logs a fatal error.

type LogFn

type LogFn func(ctx context.Context, args ...interface{}) error

type RedisStream

type RedisStream struct {
	Client redis.UniversalClient
	Rs     *redsync.Redsync
	Cfg    Config

	LockExpiry              time.Duration
	LockExtend              time.Duration
	BlockDuration           time.Duration
	ReclaimInterval         time.Duration
	CleanOldReclaimDuration time.Duration
	ProcessedIdsMaxAge      time.Duration
	// contains filtered or unexported fields
}

RedisStream is the main struct implementing IRedStream.

func (*RedisStream) HealthCheck

func (r *RedisStream) HealthCheck(ctx context.Context) error

HealthCheck performs a health check on the Redis connection by sending a PING command. It verifies if the Redis server is responsive and the connection is active.

Parameters:

  • ctx: A context.Context for handling cancellation and timeouts.

Returns:

  • error: An error if the PING command fails or the connection is not healthy, nil if the health check is successful.

func (*RedisStream) Publish

func (r *RedisStream) Publish(ctx context.Context, data any) (string, error)

Publish adds a new message to the Redis stream. It optionally checks for and drops concurrent duplicates.

The function marshals the provided data into JSON, optionally acquires a lock to prevent concurrent duplicates, and then adds the message to the stream using Redis XADD command.

Parameters:

  • ctx: A context.Context for handling cancellation and timeouts.
  • data: Any data type that can be marshaled into JSON to be published as a message.

Returns:

  • string: The ID of the published message in the Redis stream. Empty if skipped due to duplication.
  • error: An error if any step in the publishing process fails, nil otherwise.

func (*RedisStream) RegisterHandler

func (r *RedisStream) RegisterHandler(handler func(ctx context.Context, msg map[string]string) error)

RegisterHandler sets the handler for new messages.

func (*RedisStream) StartConsumer

func (r *RedisStream) StartConsumer(ctx context.Context) error

StartConsumer initiates the consumer process for the Redis stream. It starts goroutines for listening to new messages, reclaiming pending messages (if enabled), and performing periodic cleanup of processed message IDs.

This function sets up the necessary channels and goroutines to handle message processing concurrency, new message consumption, and maintenance tasks.

Parameters:

  • ctx: A context.Context for handling cancellation and timeouts.

Returns:

  • error: An error if the handler function is not set, nil otherwise.

func (*RedisStream) UseDebug

func (r *RedisStream) UseDebug(fn LogFn)

func (*RedisStream) UseError

func (r *RedisStream) UseError(fn LogFn)

func (*RedisStream) UseInfo

func (r *RedisStream) UseInfo(fn LogFn)

Directories

Path Synopsis
Package mocks is a generated GoMock package.
Package mocks is a generated GoMock package.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL