submux

package module
v0.5.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Apr 4, 2026 License: MIT Imports: 16 Imported by: 0

README

submux

submux is a smart Pub/Sub multiplexer for Redis Cluster in Go. It significantly reduces the number of connections required for high-volume Pub/Sub applications by multiplexing multiple subscriptions over a small pool of dedicated connections.

Features

  • Smart Multiplexing: Automatically routes subscriptions to the correct connection based on hashslots.
  • Topology Aware: Monitors Redis Cluster topology changes and handles hashslot migrations automatically.
  • Resilient: Background event loop manages connection health and auto-reconnects.
  • Scalable: Distributes read load across replicas (optional).
  • Production Ready: Optional OpenTelemetry metrics for observability (zero overhead when disabled).
  • Drop-in Ready: Built on top of the standard go-redis/v9 library.

Installation

go get github.com/lalloni/submux

Requirements

  • Go: 1.25.6 or later
  • Redis: 6.0+ (Redis 7.0+ required for sharded pub/sub via SSubscribeSync)
  • go-redis: v9

Quick Start

package main

import (
	"context"
	"fmt"
	"log"

	"github.com/redis/go-redis/v9"
	"github.com/lalloni/submux"
)

func main() {
	// 1. Initialize go-redis cluster client
	rdb := redis.NewClusterClient(&redis.ClusterOptions{
		Addrs: []string{"localhost:7000", "localhost:7001"},
	})
	defer rdb.Close()

	// 2. Create SubMux instance with auto-resubscribe enabled
	sm, err := submux.New(rdb,
		submux.WithAutoResubscribe(true),         // Handle migrations automatically
		submux.WithNodePreference(submux.BalancedAll), // Distribute across all nodes
	)
	if err != nil {
		log.Fatalf("Failed to create SubMux: %v", err)
	}
	defer sm.Close()

	// 3. Subscribe to channels
	ctx := context.Background()
	sub, err := sm.SubscribeSync(ctx, []string{"my-channel"}, func(ctx context.Context, msg *submux.Message) {
		// Handle different message types
		switch msg.Type {
		case submux.MessageTypeMessage:
			fmt.Printf("Message on %s: %s\n", msg.Channel, msg.Payload)
		case submux.MessageTypeSignal:
			// Topology change notification (migration, node failure, etc.)
			log.Printf("Signal: %s - %s", msg.Signal.EventType, msg.Signal.Details)
		}
	})
	if err != nil {
		log.Fatalf("Failed to subscribe: %v", err)
	}

	// 4. Cleanup when done
	defer sub.Unsubscribe(ctx)

	// Keep alive...
	select {}
}

Subscription Types

submux supports all three Redis pub/sub mechanisms:

1. Regular Subscribe (SUBSCRIBE)

Subscribe to specific channels by exact name:

sub, err := sm.SubscribeSync(ctx, []string{"orders", "payments"}, func(ctx context.Context, msg *submux.Message) {
	fmt.Printf("Channel %s: %s\n", msg.Channel, msg.Payload)
})
2. Pattern Subscribe (PSUBSCRIBE)

Subscribe using glob-style patterns:

sub, err := sm.PSubscribeSync(ctx, []string{"news:*", "logs:error:*"}, func(ctx context.Context, msg *submux.Message) {
	// msg.Pattern contains the matched pattern
	// msg.Channel contains the actual channel name
	fmt.Printf("Pattern %s matched channel %s: %s\n", msg.Pattern, msg.Channel, msg.Payload)
})

Supported wildcards: ? (single char), * (multiple chars), [abc] (character set)

3. Sharded Subscribe (SSUBSCRIBE) - Redis 7.0+

Cluster-aware pub/sub with guaranteed node affinity and lower overhead:

sub, err := sm.SSubscribeSync(ctx, []string{"events", "notifications"}, func(ctx context.Context, msg *submux.Message) {
	fmt.Printf("Sharded channel %s: %s\n", msg.Channel, msg.Payload)
})

Note: Requires Redis 7.0 or later. Falls back to regular SubscribeSync on older versions.

Configuration Options

All options are configured via submux.New():

sm, err := submux.New(rdb,
	submux.WithAutoResubscribe(true),                    // Enable automatic migration handling
	submux.WithNodePreference(submux.BalancedAll),       // Node distribution strategy
	submux.WithTopologyPollInterval(2 * time.Second),    // Topology refresh rate
	submux.WithMigrationTimeout(30 * time.Second),       // Max time for migration resubscription
	submux.WithMigrationStallCheck(2 * time.Second),     // Stall detection interval
	submux.WithLogger(slog.New(slog.NewJSONHandler(os.Stdout, nil))), // Custom logger
	submux.WithMeterProvider(meterProvider),             // OpenTelemetry metrics (optional)
)
Available Options
Option Default Description
WithAutoResubscribe(bool) false Enable automatic resubscription during hashslot migrations
WithNodePreference(NodePreference) BalancedAll Node distribution strategy (see below)
WithTopologyPollInterval(duration) 1s How often to poll cluster topology (min: 100ms)
WithMigrationTimeout(duration) 30s Max time to wait for migration resubscription (min: 1s)
WithMigrationStallCheck(duration) 2s How often to check for stalled migrations (min: 100ms)
WithCallbackWorkers(int) runtime.NumCPU() * 2 Number of worker goroutines for callback execution
WithCallbackQueueSize(int) 10000 Maximum pending callbacks in worker pool queue
WithSubscriptionQueueLimit(int) 100 Max messages queued per subscription before dropping (0 = unlimited)
WithLogger(*slog.Logger) slog.Default() Custom structured logger
WithMeterProvider(metric.MeterProvider) nil OpenTelemetry metrics provider (opt-in)
Per-Subscription Options

Subscribe methods accept optional SubscribeOption parameters to override defaults for individual subscriptions:

// Override the queue limit for a high-throughput subscription
sub, err := sm.SubscribeSync(ctx, []string{"firehose"}, callback,
    submux.WithQueueLimit(10000),  // allow larger buffer for this subscription
)

// Disable queue limit entirely for a specific subscription
sub, err := sm.SubscribeSync(ctx, []string{"critical"}, callback,
    submux.WithQueueLimit(0),  // unlimited (no dropping)
)
Option Default Description
WithQueueLimit(int) SubMux default (100) Max messages queued per subscription before dropping (0 = unlimited)
Node Distribution Strategies

Control how subscriptions are distributed across cluster nodes:

// BalancedAll (default, recommended): Distribute equally across ALL nodes (masters + replicas)
// - Best resource utilization
// - Optimal for clusters with many nodes
sm, _ := submux.New(rdb, submux.WithNodePreference(submux.BalancedAll))

// PreferMasters: Route all subscriptions to master nodes only
// - Legacy behavior
// - Use if replicas are unreliable
sm, _ := submux.New(rdb, submux.WithNodePreference(submux.PreferMasters))

// PreferReplicas: Prefer replica nodes to protect write-saturated masters
// - Offloads read operations from masters
// - Falls back to masters if no replicas available
sm, _ := submux.New(rdb, submux.WithNodePreference(submux.PreferReplicas))

Handling Signal Messages

SubMux sends signal messages to notify your application of cluster topology events, regardless of auto-resubscribe configuration.

Signal Delivery Guarantees

Signals are ALWAYS sent - even when WithAutoResubscribe(true) is enabled ✅ Signals are sent ONCE per event - not continuously during the event ✅ Signals are best-effort - may be lost if callback queue is full

When Signals Are Sent
Event Signal Sent EventType Description
Hashslot migration detected Yes EventMigration MOVED error received, resubscription starting
Migration resubscription completed No N/A Silent - resubscription succeeds automatically
Migration stalled (no progress) Yes EventMigrationStalled Resubscription is stuck, manual intervention may be needed
Migration timeout reached Yes EventMigrationTimeout Resubscription exceeded configured timeout
Node failure detected Yes EventNodeFailure Connection to node lost
Subscription queue overflow Yes EventQueueOverflow Per-subscription queue full, messages being dropped
Topology refresh occurred No N/A Background polling doesn't trigger signals
Relationship with Auto-Resubscribe

Auto-Resubscribe OFF (default):

sm, _ := submux.New(rdb) // auto-resubscribe disabled

sub, _ := sm.SubscribeSync(ctx, []string{"channel"},
    func(ctx context.Context, msg *submux.Message) {
        if msg.Type == submux.MessageTypeSignal {
            // You MUST handle resubscription manually
            log.Printf("Migration detected: %v", msg.Signal)
            // Resubscribe logic here
        }
    })

Auto-Resubscribe ON:

sm, _ := submux.New(rdb, submux.WithAutoResubscribe(true))

sub, _ := sm.SubscribeSync(ctx, []string{"channel"},
    func(ctx context.Context, msg *submux.Message) {
        if msg.Type == submux.MessageTypeSignal {
            // Signals are STILL sent, but resubscription is automatic
            log.Printf("Migration occurred: %v", msg.Signal)
            // No action needed, just for observability
        }
    })

Key Point: Signals provide observability into cluster events. Auto-resubscribe provides automatic recovery. They work together, not as alternatives.

Signal Handling Example

Critical for production: Always handle MessageTypeSignal in callbacks to monitor topology events:

sub, _ := sm.SubscribeSync(ctx, []string{"orders"}, func(ctx context.Context, msg *submux.Message) {
	switch msg.Type {
	case submux.MessageTypeMessage:
		// Normal pub/sub message
		processOrder(msg.Payload)

	case submux.MessageTypeSignal:
		// Topology change notification
		switch msg.Signal.EventType {
		case submux.EventMigration:
			// Hashslot migration detected (auto-resubscribe initiated if enabled)
			log.Printf("Migration: hashslot %d from %s to %s",
				msg.Signal.Hashslot, msg.Signal.OldNode, msg.Signal.NewNode)
			metrics.RecordMigration()

		case submux.EventNodeFailure:
			// Connection to Redis node lost
			log.Printf("Node failure: %s - %s", msg.Signal.OldNode, msg.Signal.Details)
			alerts.SendAlert("Redis node failure detected")

		case submux.EventMigrationStalled:
			// Migration resubscription made no progress for 2+ seconds
			log.Printf("Migration stalled: %s", msg.Signal.Details)
			alerts.SendWarning("Migration stalled")

		case submux.EventMigrationTimeout:
			// Migration resubscription exceeded 30 seconds
			log.Printf("Migration timeout: %s", msg.Signal.Details)
			alerts.SendCritical("Migration timeout - manual intervention may be required")

		case submux.EventQueueOverflow:
			// Per-subscription queue full, messages being dropped
			log.Printf("Queue overflow: %s (dropped %d)", msg.Signal.Details, msg.Signal.DroppedCount)
			alerts.SendWarning("Subscription queue overflow - callback too slow")
		}
	}
})

Error Handling

Subscription Errors
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
defer cancel()

sub, err := sm.SubscribeSync(ctx, []string{"my-channel"}, callback)
if err != nil {
	switch {
	case errors.Is(err, context.DeadlineExceeded):
		// Redis didn't confirm subscription within timeout
		log.Printf("Subscription timeout - cluster may be slow or unhealthy")

	case errors.Is(err, submux.ErrInvalidChannel):
		// Channel name is empty or invalid
		log.Printf("Invalid channel name")

	case errors.Is(err, submux.ErrSubscriptionFailed):
		// Redis returned an error during subscription
		log.Printf("Subscription failed: %v", err)

	case errors.Is(err, submux.ErrConnectionFailed):
		// Failed to connect to Redis node
		log.Printf("Connection failed: %v", err)

	default:
		log.Printf("Unexpected error: %v", err)
	}
	return
}
defer sub.Unsubscribe(context.Background())
Unsubscribe Errors
// Always use a fresh context for unsubscribe (don't reuse expired contexts)
unsubCtx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
defer cancel()

if err := sub.Unsubscribe(unsubCtx); err != nil {
	// Unsubscribe errors are usually safe to ignore
	// The connection will be cleaned up automatically
	log.Printf("Unsubscribe warning: %v", err)
}
Callback Panics

Callbacks are automatically wrapped with panic recovery. Panics are logged but don't crash the application:

sub, _ := sm.SubscribeSync(ctx, []string{"orders"}, func(ctx context.Context, msg *submux.Message) {
	// If this panics, it's caught and logged, other subscriptions continue working
	riskyOperation(msg.Payload)
})

Migration Handling

Without Auto-Resubscribe (default)

By default, auto-resubscribe is disabled. Your application must handle migrations manually:

sm, _ := submux.New(rdb) // auto-resubscribe is false by default

sub, _ := sm.SubscribeSync(ctx, []string{"orders"}, func(ctx context.Context, msg *submux.Message) {
	if msg.Type == submux.MessageTypeSignal && msg.Signal.EventType == submux.EventMigration {
		log.Printf("Migration detected - application must handle resubscription")

		// Option 1: Manually resubscribe
		sub.Unsubscribe(ctx)
		newSub, _ := sm.SubscribeSync(ctx, []string{"orders"}, callback)

		// Option 2: Restart application
		// Option 3: Ignore if messages are not critical
	}
})

Enable automatic migration handling:

sm, _ := submux.New(rdb,
	submux.WithAutoResubscribe(true), // Enable automatic resubscription
)

sub, _ := sm.SubscribeSync(ctx, []string{"orders"}, func(ctx context.Context, msg *submux.Message) {
	switch msg.Type {
	case submux.MessageTypeMessage:
		// Process normal messages
		processOrder(msg.Payload)

	case submux.MessageTypeSignal:
		// Still receive signals for monitoring, but resubscription is automatic
		if msg.Signal.EventType == submux.EventMigration {
			log.Printf("Migration detected - automatic resubscription in progress")
			metrics.RecordMigration()
		}
	}
})

How auto-resubscribe works:

  1. Topology monitor detects hashslot migration (via polling or MOVED errors)
  2. Signal message sent to callback with EventMigration
  3. Old subscription is automatically unsubscribed
  4. New subscription is automatically created on the new node
  5. If resubscription stalls (>2s) or times out (>30s), additional signals are sent

Examples

Multiple Subscriptions to Same Channel

Different callbacks can subscribe to the same channel independently:

// Subscriber 1: Process orders
sub1, _ := sm.SubscribeSync(ctx, []string{"orders"}, func(ctx context.Context, msg *submux.Message) {
	processOrder(msg.Payload)
})

// Subscriber 2: Log orders for audit
sub2, _ := sm.SubscribeSync(ctx, []string{"orders"}, func(ctx context.Context, msg *submux.Message) {
	auditLog(msg.Payload)
})

// Subscriber 3: Update metrics
sub3, _ := sm.SubscribeSync(ctx, []string{"orders"}, func(ctx context.Context, msg *submux.Message) {
	metrics.Increment("orders.received")
})

// Unsubscribing one doesn't affect the others
sub1.Unsubscribe(ctx) // sub2 and sub3 continue receiving messages
Pattern Matching with Multiple Patterns
sub, _ := sm.PSubscribeSync(ctx,
	[]string{"logs:*", "events:user:*", "metrics:cpu:*"},
	func(ctx context.Context, msg *submux.Message) {
		// msg.Pattern tells you which pattern matched
		// msg.Channel tells you the actual channel name
		log.Printf("[%s] %s: %s", msg.Pattern, msg.Channel, msg.Payload)
	},
)
Graceful Shutdown
func main() {
	rdb := redis.NewClusterClient(&redis.ClusterOptions{
		Addrs: []string{"localhost:7000"},
	})
	defer rdb.Close()

	sm, _ := submux.New(rdb, submux.WithAutoResubscribe(true))
	defer sm.Close()

	// Subscribe to channels
	ctx := context.Background()
	sub, _ := sm.SubscribeSync(ctx, []string{"orders"}, handleOrder)

	// Handle shutdown signals
	sigChan := make(chan os.Signal, 1)
	signal.Notify(sigChan, os.Interrupt, syscall.SIGTERM)

	// Wait for shutdown signal
	<-sigChan
	log.Println("Shutting down gracefully...")

	// Create timeout context for cleanup
	shutdownCtx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
	defer cancel()

	// Unsubscribe and cleanup
	if err := sub.Unsubscribe(shutdownCtx); err != nil {
		log.Printf("Unsubscribe error during shutdown: %v", err)
	}

	sm.Close() // Closes all connections and stops topology monitor
	rdb.Close()

	log.Println("Shutdown complete")
}
Production-Ready Setup with Observability
package main

import (
	"context"
	"log/slog"
	"os"

	"github.com/redis/go-redis/v9"
	"github.com/lalloni/submux"
	"go.opentelemetry.io/otel/exporters/prometheus"
	sdkmetric "go.opentelemetry.io/otel/sdk/metric"
)

func main() {
	// 1. Setup structured logging
	logger := slog.New(slog.NewJSONHandler(os.Stdout, &slog.HandlerOptions{
		Level: slog.LevelInfo,
	}))

	// 2. Setup OpenTelemetry metrics
	exporter, _ := prometheus.New()
	meterProvider := sdkmetric.NewMeterProvider(
		sdkmetric.WithReader(exporter),
	)

	// 3. Setup Redis cluster client
	rdb := redis.NewClusterClient(&redis.ClusterOptions{
		Addrs: []string{
			"redis-cluster-0:6379",
			"redis-cluster-1:6379",
			"redis-cluster-2:6379",
		},
		MaxRetries: 3,
		PoolSize:   50,
	})
	defer rdb.Close()

	// 4. Create production-ready SubMux
	sm, err := submux.New(rdb,
		submux.WithAutoResubscribe(true),                 // Handle migrations automatically
		submux.WithNodePreference(submux.BalancedAll),    // Distribute across all nodes
		submux.WithTopologyPollInterval(2*time.Second),   // Poll topology every 2s
		submux.WithMigrationTimeout(30*time.Second),      // 30s timeout for migrations
		submux.WithMigrationStallCheck(2*time.Second),    // Check for stalls every 2s
		submux.WithLogger(logger),                        // Structured logging
		submux.WithMeterProvider(meterProvider),          // OpenTelemetry metrics
	)
	if err != nil {
		logger.Error("Failed to create SubMux", "error", err)
		os.Exit(1)
	}
	defer sm.Close()

	// 5. Subscribe with comprehensive error handling
	ctx := context.Background()
	sub, err := sm.SubscribeSync(ctx, []string{"orders", "payments"}, handleMessage)
	if err != nil {
		logger.Error("Failed to subscribe", "error", err)
		os.Exit(1)
	}
	defer sub.Unsubscribe(context.Background())

	// 6. Expose Prometheus metrics
	http.Handle("/metrics", promhttp.Handler())
	go http.ListenAndServe(":9090", nil)

	// 7. Wait for shutdown
	sigChan := make(chan os.Signal, 1)
	signal.Notify(sigChan, os.Interrupt, syscall.SIGTERM)
	<-sigChan

	logger.Info("Shutting down gracefully...")
}

func handleMessage(ctx context.Context, msg *submux.Message) {
	switch msg.Type {
	case submux.MessageTypeMessage:
		// Process message
		processMessage(msg.Channel, msg.Payload)

	case submux.MessageTypeSignal:
		// Monitor topology events
		switch msg.Signal.EventType {
		case submux.EventMigration:
			slog.Info("Migration detected",
				"hashslot", msg.Signal.Hashslot,
				"old_node", msg.Signal.OldNode,
				"new_node", msg.Signal.NewNode)

		case submux.EventNodeFailure:
			slog.Error("Node failure",
				"node", msg.Signal.OldNode,
				"details", msg.Signal.Details)

		case submux.EventMigrationStalled:
			slog.Warn("Migration stalled", "details", msg.Signal.Details)

		case submux.EventMigrationTimeout:
			slog.Error("Migration timeout", "details", msg.Signal.Details)
		}
	}
}

OpenTelemetry Metrics (Optional)

submux provides optional OpenTelemetry instrumentation for production observability. Metrics are opt-in and have zero overhead when disabled.

Basic Setup
import (
    sdkmetric "go.opentelemetry.io/otel/sdk/metric"
    "go.opentelemetry.io/otel/exporters/prometheus"
)

// Enable metrics with Prometheus exporter
exporter, _ := prometheus.New()
provider := sdkmetric.NewMeterProvider(sdkmetric.WithReader(exporter))

sm, _ := submux.New(rdb,
    submux.WithMeterProvider(provider),  // Enable metrics
    submux.WithAutoResubscribe(true),
)
Available Metrics

13 Counters:

  • submux.messages.received - Messages received from Redis (by type, node)
  • submux.callbacks.invoked - Callback invocations (by subscription type)
  • submux.callbacks.panics - Panic recoveries (by subscription type)
  • submux.subscriptions.attempts - Subscription attempts (by type, success)
  • submux.connections.created - Connections created (by node)
  • submux.connections.failed - Connection failures (by node)
  • submux.migrations.started - Hashslot migrations detected
  • submux.migrations.completed - Migrations completed successfully
  • submux.migrations.stalled - Migrations stalled (>2s no progress)
  • submux.migrations.timeout - Migrations timed out (>30s)
  • submux.topology.refreshes - Topology refresh attempts (by success)
  • submux.workerpool.submissions - Callback submissions to worker pool (blocked attribute)
  • submux.workerpool.dropped - Callbacks dropped when pool stopped

5 Histograms:

  • submux.callbacks.latency - Callback execution time (milliseconds)
  • submux.messages.latency - End-to-end message latency (milliseconds)
  • submux.migrations.duration - Migration completion time (milliseconds)
  • submux.topology.refresh_latency - Topology refresh time (milliseconds)
  • submux.workerpool.queue_wait - Queue wait time before callback execution (milliseconds)

5 Observable Gauges:

  • submux.workerpool.queue_depth - Current tasks in worker pool queue
  • submux.workerpool.queue_capacity - Maximum worker pool queue capacity
  • submux.connections.active - Active Redis PubSub connections
  • submux.subscriptions.redis - Active Redis-level subscriptions
  • submux.subscriptions.active - Active SubMux subscription handles
Performance Characteristics
  • Disabled (default): 0.1 ns per operation (effectively zero overhead)
  • Enabled: ~200 ns per operation (minimal impact)
  • Cardinality-safe: No channel names in attributes (prevents metric explosion)
  • Build tag support: Compile without OTEL dependencies using -tags nometrics
Build Without Metrics
# Build without OpenTelemetry dependencies (smaller binary)
go build -tags nometrics -o myapp

Troubleshooting

Messages Not Being Received

Symptoms: Callbacks are not invoked, or only some messages are delivered.

Possible causes and solutions:

  1. Channel subscribed to wrong node

    • Check logs for "MOVED" errors
    • Verify WithAutoResubscribe(true) is enabled for production
    • Check that cluster topology is stable with redis-cli cluster info
  2. Callback execution blocked

    • Check metrics: submux.workerpool.queue_depth and submux.workerpool.dropped
    • If queue depth is at capacity, increase WithCallbackQueueSize()
    • If callbacks are being dropped, increase WithCallbackWorkers()
    • Ensure callbacks don't block indefinitely
  3. Connection closed or failed

    • Check logs for "connection failed" messages
    • Check metrics: submux.connections.created vs submux.connections.failed
    • Verify Redis cluster is healthy: redis-cli cluster nodes
Slow Message Delivery

Symptoms: High latency between publish and callback invocation.

Diagnostics:

  • Check metric: submux.messages.latency histogram (if metrics enabled)
  • Check metric: submux.workerpool.queue_wait (time waiting in queue)
  • Check metric: submux.callbacks.latency (callback execution time)

Solutions:

  • If queue_wait is high: Increase WithCallbackWorkers() or WithCallbackQueueSize()
  • If callbacks.latency is high: Optimize callback logic or offload to async workers
  • If messages.latency is high but queue is empty: Check Redis cluster network latency
Migrations Not Being Handled

Symptoms: Subscriptions stop receiving messages after hashslot migration.

Diagnostics:

  • Check logs for "migration detected" or "MOVED" errors
  • Check metric: submux.migrations.started vs submux.migrations.completed
  • Check if submux.migrations.stalled or submux.migrations.timeout is increasing

Solutions:

  • Enable auto-resubscribe: WithAutoResubscribe(true)
  • Increase migration timeout if migrations are slow: WithMigrationTimeout(60 * time.Second)
  • Decrease stall check interval for faster detection: WithMigrationStallCheck(1 * time.Second)
  • Check Redis cluster health during migrations
Memory or Goroutine Leaks

Symptoms: Memory usage grows over time, goroutine count increases.

Diagnostics:

  • Use pprof to profile goroutines: go tool pprof http://localhost:6060/debug/pprof/goroutine
  • Check for unclosed subscriptions: Ensure sub.Unsubscribe(ctx) is called
  • Check metrics: Compare active subscriptions count with expected

Solutions:

  • Always defer sub.Unsubscribe(ctx) after creating subscriptions
  • Call sm.Close() when shutting down to clean up all resources
  • Use context cancellation to stop long-running callbacks
Enabling Debug Logging
import "log/slog"

logger := slog.New(slog.NewTextHandler(os.Stdout, &slog.HandlerOptions{
    Level: slog.LevelDebug,
}))

sm, err := submux.New(rdb, submux.WithLogger(logger))

Debug logs include:

  • Connection lifecycle events
  • Subscription state transitions
  • Migration detection and resubscription
  • Error conditions and recovery
Getting Help

If you're still experiencing issues:

  1. Enable debug logging and collect logs during the problem period
  2. Collect metrics (if enabled) showing the behavior
  3. Open an issue at https://github.com/lalloni/submux/issues with:
    • Go version and submux version
    • Minimal reproducible example
    • Logs and metrics
    • Redis cluster configuration

Documentation

User Documentation
  • README.md (this file) - User guide, examples, configuration
  • CHANGELOG.md - Version history, release notes, migration guides
Technical Documentation
  • DESIGN.md - Architecture, design decisions, implementation details
  • AGENTS.md - Development workflows, testing strategies, conventions

Contributing

For development setup, testing strategies, code conventions, and contribution guidelines, see AGENTS.md.

License

MIT

Documentation

Overview

Package submux provides connection multiplexing for Redis Cluster Pub/Sub operations.

submux minimizes the number of required Pub/Sub connections by intelligently multiplexing multiple subscriptions over a small number of dedicated connections. It supports regular subscriptions (SUBSCRIBE), pattern subscriptions (PSUBSCRIBE), and sharded subscriptions (SSUBSCRIBE) in Redis Cluster environments.

Key features:

  • Hashslot-based connection reuse across subscriptions
  • Load balancing across master and replica nodes
  • Multiple subscriptions to the same channel with independent callbacks
  • Automatic topology change detection and signal message delivery
  • Thread-safe operations

Basic Usage

The simplest way to use submux is to create a SubMux instance and subscribe to channels:

clusterClient := redis.NewClusterClient(&redis.ClusterOptions{
    Addrs: []string{"localhost:7000", "localhost:7001"},
})

subMux, err := submux.New(clusterClient)
if err != nil {
    log.Fatal(err)
}
defer subMux.Close()

sub, err := subMux.SubscribeSync(context.Background(), []string{"mychannel"}, func(ctx context.Context, msg *submux.Message) {
    switch msg.Type {
    case submux.MessageTypeMessage:
        fmt.Printf("Received message on %s: %s\n", msg.Channel, msg.Payload)
    case submux.MessageTypeSignal:
        fmt.Printf("Topology change: %s\n", msg.Signal.Details)
    }
})
if err != nil {
    log.Fatal(err)
}
defer sub.Unsubscribe(context.Background())

Pattern Subscriptions

Subscribe to multiple channels matching a pattern:

sub, err := subMux.PSubscribeSync(context.Background(), []string{"news:*"}, func(msg *submux.Message) {
    fmt.Printf("Pattern match: channel=%s, payload=%s\n", msg.Channel, msg.Payload)
})
if err != nil {
    log.Fatal(err)
}
defer sub.Unsubscribe(context.Background())

Sharded Subscriptions

For Redis 7.0+ sharded pub/sub:

sub, err := subMux.SSubscribeSync(context.Background(), []string{"shardchannel"}, func(msg *submux.Message) {
    fmt.Printf("Sharded message: %s\n", msg.Payload)
})
if err != nil {
    log.Fatal(err)
}
defer sub.Unsubscribe(context.Background())

Multiple Subscriptions to Same Channel

You can subscribe to the same channel multiple times with different callbacks:

// First subscription
sub1, err := subMux.SubscribeSync(ctx, []string{"events"}, func(msg *submux.Message) {
    log.Printf("Logger: %s", msg.Payload)
})
if err != nil {
    log.Fatal(err)
}

// Second subscription to the same channel
sub2, err := subMux.SubscribeSync(ctx, []string{"events"}, func(msg *submux.Message) {
    metrics.Increment("events.received")
})
if err != nil {
    log.Fatal(err)
}

// Both callbacks will be invoked when messages arrive on "events".
// To unsubscribe just the first callback:
// sub1.Unsubscribe(ctx)

Configuration Options

Configure SubMux behavior using options:

subMux, err := submux.New(clusterClient,
    submux.WithAutoResubscribe(true),              // Auto-resubscribe on topology changes
    submux.WithReplicaPreference(true),            // Prefer replica nodes
    submux.WithTopologyPollInterval(2*time.Second), // Poll topology every 2 seconds
)

Handling Topology Changes

SubMux automatically detects hashslot migrations and sends signal messages:

sub, err := subMux.SubscribeSync(ctx, []string{"mychannel"}, func(msg *submux.Message) {
    if msg.Type == submux.MessageTypeSignal {
        switch msg.Signal.EventType {
        case "migration":
            fmt.Printf("Hashslot %d migrated from %s to %s\n",
                msg.Signal.Hashslot, msg.Signal.OldNode, msg.Signal.NewNode)
        case "node_failure":
            fmt.Printf("Node failure: %s\n", msg.Signal.Details)
        }
    } else {
        // Handle regular message
        processMessage(msg)
    }
})
if err != nil {
    log.Fatal(err)
}
defer sub.Unsubscribe(ctx)

Unsubscribing

Unsubscribe from channels when no longer needed using the Sub returned from SubscribeSync, PSubscribeSync, or SSubscribeSync:

sub, err := subMux.SubscribeSync(ctx, []string{"mychannel"}, callback)
if err != nil {
    log.Fatal(err)
}
// Later, when done:
sub.Unsubscribe(ctx)

Each Sub represents a specific callback. If you subscribe to the same channel multiple times with different callbacks, each returns its own Sub that can be unsubscribed independently.

Best Practices

  1. Always call Close() when done to clean up resources
  2. Handle signal messages to be aware of topology changes
  3. Use context cancellation for subscription operations
  4. Enable auto-resubscribe if you want automatic recovery from migrations
  5. Prefer replica nodes to reduce load on master nodes

See DESIGN.md for more detailed guidance.

Index

Constants

This section is empty.

Variables

View Source
var (
	// ErrInvalidClusterClient is returned when an invalid cluster client is provided.
	ErrInvalidClusterClient = errors.New("submux: invalid cluster client")

	// ErrInvalidChannel is returned when a channel name is invalid.
	ErrInvalidChannel = errors.New("submux: invalid channel name")

	// ErrSubscriptionFailed is returned when a subscription operation fails.
	ErrSubscriptionFailed = errors.New("submux: subscription failed")

	// ErrConnectionFailed is returned when a connection operation fails.
	ErrConnectionFailed = errors.New("submux: connection failed")

	// ErrClosed is returned when an operation is attempted on a closed SubMux.
	ErrClosed = errors.New("submux: SubMux is closed")

	// ErrEventLoopStopped is returned when an operation cannot complete because
	// the event loop goroutine has exited (due to Redis error or connection close).
	ErrEventLoopStopped = errors.New("submux: event loop stopped")
)

Functions

func Hashslot

func Hashslot(channel string) int

Hashslot calculates the Redis hashslot for a channel name. It uses the same hashslot calculation as go-redis/v9 (from internal/hashtag package), which supports Redis hashtag syntax: if the channel contains {tag}, only the tag portion is used for hashslot calculation.

This implementation matches go-redis/v9's internal/hashtag.Slot function exactly.

Types

type EventType

type EventType string

EventType represents the type of a signal event.

const (
	EventNodeFailure      EventType = "node_failure"
	EventMigration        EventType = "migration"
	EventMigrationTimeout EventType = "migration_timeout"
	EventMigrationStalled EventType = "migration_stalled"
	EventQueueOverflow    EventType = "queue_overflow"
)

Event types for SignalInfo.

type Message

type Message struct {
	// Type indicates the message type.
	Type MessageType

	// Channel is the channel name (for regular messages).
	Channel string

	// Payload is the message payload.
	Payload string

	// Pattern is the pattern that matched (for PSUBSCRIBE/SSUBSCRIBE messages).
	Pattern string

	// Signal contains signal information (if Type is MessageTypeSignal).
	Signal *SignalInfo

	// Timestamp is when the message was received.
	Timestamp time.Time

	// SubscriptionType indicates the subscription method (subscribe, psubscribe, ssubscribe).
	// Used internally for metrics attribution.
	SubscriptionType subscriptionType
}

Message represents a message received from a subscription or a signal notification.

type MessageCallback

type MessageCallback func(ctx context.Context, msg *Message)

MessageCallback is a function type for handling subscription events (messages and signal notifications). Callbacks are invoked asynchronously via a bounded worker pool and must be thread-safe. The context is derived from the SubMux lifecycle and is canceled when Close() is called.

type MessageType

type MessageType int

MessageType represents the type of a message received from a subscription.

const (
	// MessageTypeMessage is a regular SUBSCRIBE message.
	MessageTypeMessage MessageType = iota
	// MessageTypePMessage is a pattern PSUBSCRIBE message.
	MessageTypePMessage
	// MessageTypeSMessage is a sharded SSUBSCRIBE message.
	MessageTypeSMessage
	// MessageTypeSignal is a signal notification (topology change).
	MessageTypeSignal
)

type NodePreference

type NodePreference int

NodePreference determines the strategy for distributing subscriptions across cluster nodes.

const (
	// PreferMasters routes all subscriptions to master nodes only.
	// Use this when: You want to minimize the number of nodes involved in pub/sub,
	// or when masters have spare capacity and you want centralized routing.
	PreferMasters NodePreference = iota

	// BalancedAll distributes subscriptions equally across all nodes (masters and replicas) within each shard.
	// This is the recommended default for most workloads as it:
	// - Maximizes resource utilization across all infrastructure
	// - Provides better failure characteristics (smaller blast radius)
	// - Works well for both light and heavy pub/sub loads
	BalancedAll

	// PreferReplicas routes subscriptions to replica nodes, avoiding masters when possible.
	// Use this when: Masters are write-saturated and you need to protect them from additional pub/sub load.
	// Falls back to masters if no replicas are available.
	PreferReplicas
)

type Option

type Option func(*config)

Option is a function type for configuring a SubMux instance.

func WithAutoResubscribe

func WithAutoResubscribe(enabled bool) Option

WithAutoResubscribe enables automatic resubscription when hashslot migrations occur.

func WithCallbackQueueSize

func WithCallbackQueueSize(size int) Option

WithCallbackQueueSize sets the maximum number of pending callbacks in the worker pool queue. When the queue is full, new callbacks will block until space is available, providing backpressure to the message processing pipeline. Default is 10000.

func WithCallbackWorkers

func WithCallbackWorkers(workers int) Option

WithCallbackWorkers sets the number of worker goroutines in the callback worker pool. The worker pool bounds the number of concurrent callback executions, preventing goroutine explosion under high message throughput. Default is runtime.NumCPU() * 2.

func WithLogger

func WithLogger(logger *slog.Logger) Option

WithLogger sets the structured logger to use.

func WithMeterProvider

func WithMeterProvider(provider metric.MeterProvider) Option

WithMeterProvider sets the OpenTelemetry MeterProvider for metrics collection. Metrics are opt-in - if not provided, all metrics operations become no-ops with zero overhead.

Example usage with Prometheus:

import (
    sdkmetric "go.opentelemetry.io/otel/sdk/metric"
    "go.opentelemetry.io/otel/exporters/prometheus"
)

exporter, _ := prometheus.New()
provider := sdkmetric.NewMeterProvider(sdkmetric.WithReader(exporter))
subMux, _ := submux.New(clusterClient, submux.WithMeterProvider(provider))

func WithMigrationStallCheck

func WithMigrationStallCheck(interval time.Duration) Option

WithMigrationStallCheck sets how often to check for stalled migration resubscription progress. Default is 2 seconds.

func WithMigrationTimeout

func WithMigrationTimeout(timeout time.Duration) Option

WithMigrationTimeout sets the maximum duration to wait for migration resubscription to complete before timing out. Default is 30 seconds.

func WithNodePreference

func WithNodePreference(preference NodePreference) Option

WithNodePreference sets the strategy for distributing subscriptions across cluster nodes. Available options:

  • PreferMasters: Route subscriptions to master nodes only (legacy behavior)
  • BalancedAll: Distribute equally across all nodes - masters and replicas (recommended default)
  • PreferReplicas: Prefer replicas to protect write-saturated masters

func WithReplicaPreference

func WithReplicaPreference(preferReplicas bool) Option

WithReplicaPreference sets preference for using replica nodes over master nodes. Deprecated: Use WithNodePreference(PreferReplicas) instead for clearer intent, or WithNodePreference(BalancedAll) for better default behavior.

func WithSubscriptionQueueLimit

func WithSubscriptionQueueLimit(limit int) Option

WithSubscriptionQueueLimit sets the maximum number of messages that can be queued per subscription before messages are dropped (tail-drop). When the limit is reached, new messages are discarded and an EventQueueOverflow signal is delivered to the callback. This limit can be overridden per-subscription using WithQueueLimit on SubscribeOption. Set to 0 for unlimited (no dropping). Default is 100.

func WithTopologyPollInterval

func WithTopologyPollInterval(interval time.Duration) Option

WithTopologyPollInterval sets how often to poll the cluster topology for changes. The minimum recommended interval is 1 second. Shorter intervals may increase load on the Redis cluster.

type SignalInfo

type SignalInfo struct {
	// EventType is the type of event: "migration", "topology_change", "node_failure".
	EventType EventType

	// Hashslot is the affected hashslot (0-16383).
	Hashslot int

	// OldNode is the previous node address (if applicable).
	OldNode string

	// NewNode is the new node address (if applicable).
	NewNode string

	// Details contains additional details about the event.
	Details string

	// DroppedCount is the number of messages dropped since the overflow episode began.
	// Only meaningful when EventType is EventQueueOverflow.
	DroppedCount int
}

SignalInfo contains information about cluster topology changes or hashslot migrations.

type Sub

type Sub struct {
	// contains filtered or unexported fields
}

Sub represents a subscription to one or more channels/patterns with a specific callback. Sub is returned from SubscribeSync, PSubscribeSync, or SSubscribeSync and can be used to unsubscribe the specific callback that was provided during subscription.

func (*Sub) Unsubscribe

func (s *Sub) Unsubscribe(ctx context.Context) error

Unsubscribe unsubscribes the callback associated with this Subscription from all channels/patterns it was subscribed to. It is safe to call multiple times.

type SubMux

type SubMux struct {
	// contains filtered or unexported fields
}

SubMux provides connection multiplexing for Redis Cluster Pub/Sub operations.

func New

func New(clusterClient *redis.ClusterClient, opts ...Option) (*SubMux, error)

New creates a new SubMux instance wrapping a ClusterClient.

func (*SubMux) Close

func (sm *SubMux) Close() error

Close closes all subscriptions and connections, stops all goroutines, and releases resources. It is safe to call multiple times.

func (*SubMux) PSubscribeSync

func (sm *SubMux) PSubscribeSync(ctx context.Context, patterns []string, callback MessageCallback, opts ...SubscribeOption) (*Sub, error)

PSubscribeSync subscribes to one or more channel patterns using pattern subscription. It waits for the subscription to be confirmed before returning. Returns a Sub that can be used to unsubscribe the provided callback.

func (*SubMux) SSubscribeSync

func (sm *SubMux) SSubscribeSync(ctx context.Context, patterns []string, callback MessageCallback, opts ...SubscribeOption) (*Sub, error)

SSubscribeSync subscribes to one or more channel patterns using sharded subscription (Redis 7.0+). It waits for the subscription to be confirmed before returning. Returns a Sub that can be used to unsubscribe the provided callback.

func (*SubMux) SubscribeSync

func (sm *SubMux) SubscribeSync(ctx context.Context, channels []string, callback MessageCallback, opts ...SubscribeOption) (*Sub, error)

SubscribeSync subscribes to one or more channels using regular channel subscription. It waits for the subscription to be confirmed before returning. Returns a Subscription that can be used to unsubscribe the provided callback.

type SubscribeOption

type SubscribeOption func(*subscribeConfig)

SubscribeOption configures per-subscription behavior. Options are passed to SubscribeSync, PSubscribeSync, or SSubscribeSync.

func WithQueueLimit

func WithQueueLimit(limit int) SubscribeOption

WithQueueLimit overrides the default subscription queue limit for this subscription. When the queue reaches the limit, new messages are dropped (tail-drop) and an EventQueueOverflow signal is delivered to the callback. Set to 0 for unlimited (no dropping). If not called, the SubMux-level default from WithSubscriptionQueueLimit is used.

type WorkerPool

type WorkerPool struct {
	// contains filtered or unexported fields
}

WorkerPool manages a bounded pool of worker goroutines for executing tasks. It provides backpressure when the queue is full, preventing goroutine explosion under high load.

func NewWorkerPool

func NewWorkerPool(workers, queueSize int) *WorkerPool

NewWorkerPool creates a new worker pool with the specified number of workers and queue size. If workers is 0, it defaults to runtime.NumCPU() * 2. If queueSize is 0, it defaults to 10000.

func (*WorkerPool) Context

func (wp *WorkerPool) Context() context.Context

Context returns the worker pool's context. This context is canceled when Stop() is called.

func (*WorkerPool) QueueCapacity

func (wp *WorkerPool) QueueCapacity() int

QueueCapacity returns the maximum queue capacity.

func (*WorkerPool) QueueLength

func (wp *WorkerPool) QueueLength() int

QueueLength returns the current number of tasks in the queue.

func (*WorkerPool) Start

func (wp *WorkerPool) Start()

Start starts the worker goroutines. It is safe to call multiple times.

func (*WorkerPool) Stop

func (wp *WorkerPool) Stop()

Stop stops the worker pool gracefully, waiting for all workers to finish. Tasks still in the queue will be processed before workers exit. It is safe to call multiple times.

func (*WorkerPool) Submit

func (wp *WorkerPool) Submit(task func()) bool

Submit submits a task to the worker pool. It blocks if the queue is full (providing backpressure). Returns false if the pool is stopped or the context is canceled.

func (*WorkerPool) SubmitWithContext

func (wp *WorkerPool) SubmitWithContext(ctx context.Context, task func()) bool

SubmitWithContext submits a task with a context for cancellation. Returns false if the pool is stopped, the context is canceled, or the provided context is done.

func (*WorkerPool) TrySubmit

func (wp *WorkerPool) TrySubmit(task func()) bool

TrySubmit attempts to submit a task without blocking. Returns true if the task was submitted, false if the queue is full or the pool is stopped.

func (*WorkerPool) Workers

func (wp *WorkerPool) Workers() int

Workers returns the number of worker goroutines.

Directories

Path Synopsis

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL