Documentation
¶
Overview ¶
Package submux provides connection multiplexing for Redis Cluster Pub/Sub operations.
submux minimizes the number of required Pub/Sub connections by intelligently multiplexing multiple subscriptions over a small number of dedicated connections. It supports regular subscriptions (SUBSCRIBE), pattern subscriptions (PSUBSCRIBE), and sharded subscriptions (SSUBSCRIBE) in Redis Cluster environments.
Key features:
- Hashslot-based connection reuse across subscriptions
- Load balancing across master and replica nodes
- Multiple subscriptions to the same channel with independent callbacks
- Automatic topology change detection and signal message delivery
- Thread-safe operations
Basic Usage ¶
The simplest way to use submux is to create a SubMux instance and subscribe to channels:
clusterClient := redis.NewClusterClient(&redis.ClusterOptions{
Addrs: []string{"localhost:7000", "localhost:7001"},
})
subMux, err := submux.New(clusterClient)
if err != nil {
log.Fatal(err)
}
defer subMux.Close()
sub, err := subMux.SubscribeSync(context.Background(), []string{"mychannel"}, func(ctx context.Context, msg *submux.Message) {
switch msg.Type {
case submux.MessageTypeMessage:
fmt.Printf("Received message on %s: %s\n", msg.Channel, msg.Payload)
case submux.MessageTypeSignal:
fmt.Printf("Topology change: %s\n", msg.Signal.Details)
}
})
if err != nil {
log.Fatal(err)
}
defer sub.Unsubscribe(context.Background())
Pattern Subscriptions ¶
Subscribe to multiple channels matching a pattern:
sub, err := subMux.PSubscribeSync(context.Background(), []string{"news:*"}, func(msg *submux.Message) {
fmt.Printf("Pattern match: channel=%s, payload=%s\n", msg.Channel, msg.Payload)
})
if err != nil {
log.Fatal(err)
}
defer sub.Unsubscribe(context.Background())
Sharded Subscriptions ¶
For Redis 7.0+ sharded pub/sub:
sub, err := subMux.SSubscribeSync(context.Background(), []string{"shardchannel"}, func(msg *submux.Message) {
fmt.Printf("Sharded message: %s\n", msg.Payload)
})
if err != nil {
log.Fatal(err)
}
defer sub.Unsubscribe(context.Background())
Multiple Subscriptions to Same Channel ¶
You can subscribe to the same channel multiple times with different callbacks:
// First subscription
sub1, err := subMux.SubscribeSync(ctx, []string{"events"}, func(msg *submux.Message) {
log.Printf("Logger: %s", msg.Payload)
})
if err != nil {
log.Fatal(err)
}
// Second subscription to the same channel
sub2, err := subMux.SubscribeSync(ctx, []string{"events"}, func(msg *submux.Message) {
metrics.Increment("events.received")
})
if err != nil {
log.Fatal(err)
}
// Both callbacks will be invoked when messages arrive on "events".
// To unsubscribe just the first callback:
// sub1.Unsubscribe(ctx)
Configuration Options ¶
Configure SubMux behavior using options:
subMux, err := submux.New(clusterClient,
submux.WithAutoResubscribe(true), // Auto-resubscribe on topology changes
submux.WithReplicaPreference(true), // Prefer replica nodes
submux.WithTopologyPollInterval(2*time.Second), // Poll topology every 2 seconds
)
Handling Topology Changes ¶
SubMux automatically detects hashslot migrations and sends signal messages:
sub, err := subMux.SubscribeSync(ctx, []string{"mychannel"}, func(msg *submux.Message) {
if msg.Type == submux.MessageTypeSignal {
switch msg.Signal.EventType {
case "migration":
fmt.Printf("Hashslot %d migrated from %s to %s\n",
msg.Signal.Hashslot, msg.Signal.OldNode, msg.Signal.NewNode)
case "node_failure":
fmt.Printf("Node failure: %s\n", msg.Signal.Details)
}
} else {
// Handle regular message
processMessage(msg)
}
})
if err != nil {
log.Fatal(err)
}
defer sub.Unsubscribe(ctx)
Unsubscribing ¶
Unsubscribe from channels when no longer needed using the Sub returned from SubscribeSync, PSubscribeSync, or SSubscribeSync:
sub, err := subMux.SubscribeSync(ctx, []string{"mychannel"}, callback)
if err != nil {
log.Fatal(err)
}
// Later, when done:
sub.Unsubscribe(ctx)
Each Sub represents a specific callback. If you subscribe to the same channel multiple times with different callbacks, each returns its own Sub that can be unsubscribed independently.
Best Practices ¶
- Always call Close() when done to clean up resources
- Handle signal messages to be aware of topology changes
- Use context cancellation for subscription operations
- Enable auto-resubscribe if you want automatic recovery from migrations
- Prefer replica nodes to reduce load on master nodes
See DESIGN.md for more detailed guidance.
Index ¶
- Variables
- func Hashslot(channel string) int
- type EventType
- type Message
- type MessageCallback
- type MessageType
- type NodePreference
- type Option
- func WithAutoResubscribe(enabled bool) Option
- func WithCallbackQueueSize(size int) Option
- func WithCallbackWorkers(workers int) Option
- func WithLogger(logger *slog.Logger) Option
- func WithMeterProvider(provider metric.MeterProvider) Option
- func WithMigrationStallCheck(interval time.Duration) Option
- func WithMigrationTimeout(timeout time.Duration) Option
- func WithNodePreference(preference NodePreference) Option
- func WithReplicaPreference(preferReplicas bool) Option
- func WithSubscriptionQueueLimit(limit int) Option
- func WithTopologyPollInterval(interval time.Duration) Option
- type SignalInfo
- type Sub
- type SubMux
- func (sm *SubMux) Close() error
- func (sm *SubMux) PSubscribeSync(ctx context.Context, patterns []string, callback MessageCallback, ...) (*Sub, error)
- func (sm *SubMux) SSubscribeSync(ctx context.Context, patterns []string, callback MessageCallback, ...) (*Sub, error)
- func (sm *SubMux) SubscribeSync(ctx context.Context, channels []string, callback MessageCallback, ...) (*Sub, error)
- type SubscribeOption
- type WorkerPool
- func (wp *WorkerPool) Context() context.Context
- func (wp *WorkerPool) QueueCapacity() int
- func (wp *WorkerPool) QueueLength() int
- func (wp *WorkerPool) Start()
- func (wp *WorkerPool) Stop()
- func (wp *WorkerPool) Submit(task func()) bool
- func (wp *WorkerPool) SubmitWithContext(ctx context.Context, task func()) bool
- func (wp *WorkerPool) TrySubmit(task func()) bool
- func (wp *WorkerPool) Workers() int
Constants ¶
This section is empty.
Variables ¶
var ( // ErrInvalidClusterClient is returned when an invalid cluster client is provided. ErrInvalidClusterClient = errors.New("submux: invalid cluster client") // ErrInvalidChannel is returned when a channel name is invalid. ErrInvalidChannel = errors.New("submux: invalid channel name") // ErrSubscriptionFailed is returned when a subscription operation fails. ErrSubscriptionFailed = errors.New("submux: subscription failed") // ErrConnectionFailed is returned when a connection operation fails. ErrConnectionFailed = errors.New("submux: connection failed") // ErrClosed is returned when an operation is attempted on a closed SubMux. ErrClosed = errors.New("submux: SubMux is closed") // ErrEventLoopStopped is returned when an operation cannot complete because // the event loop goroutine has exited (due to Redis error or connection close). ErrEventLoopStopped = errors.New("submux: event loop stopped") )
Functions ¶
func Hashslot ¶
Hashslot calculates the Redis hashslot for a channel name. It uses the same hashslot calculation as go-redis/v9 (from internal/hashtag package), which supports Redis hashtag syntax: if the channel contains {tag}, only the tag portion is used for hashslot calculation.
This implementation matches go-redis/v9's internal/hashtag.Slot function exactly.
Types ¶
type Message ¶
type Message struct {
// Type indicates the message type.
Type MessageType
// Channel is the channel name (for regular messages).
Channel string
// Payload is the message payload.
Payload string
// Pattern is the pattern that matched (for PSUBSCRIBE/SSUBSCRIBE messages).
Pattern string
// Signal contains signal information (if Type is MessageTypeSignal).
Signal *SignalInfo
// Timestamp is when the message was received.
Timestamp time.Time
// SubscriptionType indicates the subscription method (subscribe, psubscribe, ssubscribe).
// Used internally for metrics attribution.
SubscriptionType subscriptionType
}
Message represents a message received from a subscription or a signal notification.
type MessageCallback ¶
MessageCallback is a function type for handling subscription events (messages and signal notifications). Callbacks are invoked asynchronously via a bounded worker pool and must be thread-safe. The context is derived from the SubMux lifecycle and is canceled when Close() is called.
type MessageType ¶
type MessageType int
MessageType represents the type of a message received from a subscription.
const ( // MessageTypeMessage is a regular SUBSCRIBE message. MessageTypeMessage MessageType = iota // MessageTypePMessage is a pattern PSUBSCRIBE message. MessageTypePMessage // MessageTypeSMessage is a sharded SSUBSCRIBE message. MessageTypeSMessage // MessageTypeSignal is a signal notification (topology change). MessageTypeSignal )
type NodePreference ¶
type NodePreference int
NodePreference determines the strategy for distributing subscriptions across cluster nodes.
const ( // PreferMasters routes all subscriptions to master nodes only. // Use this when: You want to minimize the number of nodes involved in pub/sub, // or when masters have spare capacity and you want centralized routing. PreferMasters NodePreference = iota // BalancedAll distributes subscriptions equally across all nodes (masters and replicas) within each shard. // This is the recommended default for most workloads as it: // - Maximizes resource utilization across all infrastructure // - Provides better failure characteristics (smaller blast radius) // - Works well for both light and heavy pub/sub loads BalancedAll // PreferReplicas routes subscriptions to replica nodes, avoiding masters when possible. // Use this when: Masters are write-saturated and you need to protect them from additional pub/sub load. // Falls back to masters if no replicas are available. PreferReplicas )
type Option ¶
type Option func(*config)
Option is a function type for configuring a SubMux instance.
func WithAutoResubscribe ¶
WithAutoResubscribe enables automatic resubscription when hashslot migrations occur.
func WithCallbackQueueSize ¶
WithCallbackQueueSize sets the maximum number of pending callbacks in the worker pool queue. When the queue is full, new callbacks will block until space is available, providing backpressure to the message processing pipeline. Default is 10000.
func WithCallbackWorkers ¶
WithCallbackWorkers sets the number of worker goroutines in the callback worker pool. The worker pool bounds the number of concurrent callback executions, preventing goroutine explosion under high message throughput. Default is runtime.NumCPU() * 2.
func WithLogger ¶
WithLogger sets the structured logger to use.
func WithMeterProvider ¶
func WithMeterProvider(provider metric.MeterProvider) Option
WithMeterProvider sets the OpenTelemetry MeterProvider for metrics collection. Metrics are opt-in - if not provided, all metrics operations become no-ops with zero overhead.
Example usage with Prometheus:
import (
sdkmetric "go.opentelemetry.io/otel/sdk/metric"
"go.opentelemetry.io/otel/exporters/prometheus"
)
exporter, _ := prometheus.New()
provider := sdkmetric.NewMeterProvider(sdkmetric.WithReader(exporter))
subMux, _ := submux.New(clusterClient, submux.WithMeterProvider(provider))
func WithMigrationStallCheck ¶
WithMigrationStallCheck sets how often to check for stalled migration resubscription progress. Default is 2 seconds.
func WithMigrationTimeout ¶
WithMigrationTimeout sets the maximum duration to wait for migration resubscription to complete before timing out. Default is 30 seconds.
func WithNodePreference ¶
func WithNodePreference(preference NodePreference) Option
WithNodePreference sets the strategy for distributing subscriptions across cluster nodes. Available options:
- PreferMasters: Route subscriptions to master nodes only (legacy behavior)
- BalancedAll: Distribute equally across all nodes - masters and replicas (recommended default)
- PreferReplicas: Prefer replicas to protect write-saturated masters
func WithReplicaPreference ¶
WithReplicaPreference sets preference for using replica nodes over master nodes. Deprecated: Use WithNodePreference(PreferReplicas) instead for clearer intent, or WithNodePreference(BalancedAll) for better default behavior.
func WithSubscriptionQueueLimit ¶
WithSubscriptionQueueLimit sets the maximum number of messages that can be queued per subscription before messages are dropped (tail-drop). When the limit is reached, new messages are discarded and an EventQueueOverflow signal is delivered to the callback. This limit can be overridden per-subscription using WithQueueLimit on SubscribeOption. Set to 0 for unlimited (no dropping). Default is 100.
func WithTopologyPollInterval ¶
WithTopologyPollInterval sets how often to poll the cluster topology for changes. The minimum recommended interval is 1 second. Shorter intervals may increase load on the Redis cluster.
type SignalInfo ¶
type SignalInfo struct {
// EventType is the type of event: "migration", "topology_change", "node_failure".
EventType EventType
// Hashslot is the affected hashslot (0-16383).
Hashslot int
// OldNode is the previous node address (if applicable).
OldNode string
// NewNode is the new node address (if applicable).
NewNode string
// Details contains additional details about the event.
Details string
// DroppedCount is the number of messages dropped since the overflow episode began.
// Only meaningful when EventType is EventQueueOverflow.
DroppedCount int
}
SignalInfo contains information about cluster topology changes or hashslot migrations.
type Sub ¶
type Sub struct {
// contains filtered or unexported fields
}
Sub represents a subscription to one or more channels/patterns with a specific callback. Sub is returned from SubscribeSync, PSubscribeSync, or SSubscribeSync and can be used to unsubscribe the specific callback that was provided during subscription.
type SubMux ¶
type SubMux struct {
// contains filtered or unexported fields
}
SubMux provides connection multiplexing for Redis Cluster Pub/Sub operations.
func New ¶
func New(clusterClient *redis.ClusterClient, opts ...Option) (*SubMux, error)
New creates a new SubMux instance wrapping a ClusterClient.
func (*SubMux) Close ¶
Close closes all subscriptions and connections, stops all goroutines, and releases resources. It is safe to call multiple times.
func (*SubMux) PSubscribeSync ¶
func (sm *SubMux) PSubscribeSync(ctx context.Context, patterns []string, callback MessageCallback, opts ...SubscribeOption) (*Sub, error)
PSubscribeSync subscribes to one or more channel patterns using pattern subscription. It waits for the subscription to be confirmed before returning. Returns a Sub that can be used to unsubscribe the provided callback.
func (*SubMux) SSubscribeSync ¶
func (sm *SubMux) SSubscribeSync(ctx context.Context, patterns []string, callback MessageCallback, opts ...SubscribeOption) (*Sub, error)
SSubscribeSync subscribes to one or more channel patterns using sharded subscription (Redis 7.0+). It waits for the subscription to be confirmed before returning. Returns a Sub that can be used to unsubscribe the provided callback.
func (*SubMux) SubscribeSync ¶
func (sm *SubMux) SubscribeSync(ctx context.Context, channels []string, callback MessageCallback, opts ...SubscribeOption) (*Sub, error)
SubscribeSync subscribes to one or more channels using regular channel subscription. It waits for the subscription to be confirmed before returning. Returns a Subscription that can be used to unsubscribe the provided callback.
type SubscribeOption ¶
type SubscribeOption func(*subscribeConfig)
SubscribeOption configures per-subscription behavior. Options are passed to SubscribeSync, PSubscribeSync, or SSubscribeSync.
func WithQueueLimit ¶
func WithQueueLimit(limit int) SubscribeOption
WithQueueLimit overrides the default subscription queue limit for this subscription. When the queue reaches the limit, new messages are dropped (tail-drop) and an EventQueueOverflow signal is delivered to the callback. Set to 0 for unlimited (no dropping). If not called, the SubMux-level default from WithSubscriptionQueueLimit is used.
type WorkerPool ¶
type WorkerPool struct {
// contains filtered or unexported fields
}
WorkerPool manages a bounded pool of worker goroutines for executing tasks. It provides backpressure when the queue is full, preventing goroutine explosion under high load.
func NewWorkerPool ¶
func NewWorkerPool(workers, queueSize int) *WorkerPool
NewWorkerPool creates a new worker pool with the specified number of workers and queue size. If workers is 0, it defaults to runtime.NumCPU() * 2. If queueSize is 0, it defaults to 10000.
func (*WorkerPool) Context ¶
func (wp *WorkerPool) Context() context.Context
Context returns the worker pool's context. This context is canceled when Stop() is called.
func (*WorkerPool) QueueCapacity ¶
func (wp *WorkerPool) QueueCapacity() int
QueueCapacity returns the maximum queue capacity.
func (*WorkerPool) QueueLength ¶
func (wp *WorkerPool) QueueLength() int
QueueLength returns the current number of tasks in the queue.
func (*WorkerPool) Start ¶
func (wp *WorkerPool) Start()
Start starts the worker goroutines. It is safe to call multiple times.
func (*WorkerPool) Stop ¶
func (wp *WorkerPool) Stop()
Stop stops the worker pool gracefully, waiting for all workers to finish. Tasks still in the queue will be processed before workers exit. It is safe to call multiple times.
func (*WorkerPool) Submit ¶
func (wp *WorkerPool) Submit(task func()) bool
Submit submits a task to the worker pool. It blocks if the queue is full (providing backpressure). Returns false if the pool is stopped or the context is canceled.
func (*WorkerPool) SubmitWithContext ¶
func (wp *WorkerPool) SubmitWithContext(ctx context.Context, task func()) bool
SubmitWithContext submits a task with a context for cancellation. Returns false if the pool is stopped, the context is canceled, or the provided context is done.
func (*WorkerPool) TrySubmit ¶
func (wp *WorkerPool) TrySubmit(task func()) bool
TrySubmit attempts to submit a task without blocking. Returns true if the task was submitted, false if the queue is full or the pool is stopped.
func (*WorkerPool) Workers ¶
func (wp *WorkerPool) Workers() int
Workers returns the number of worker goroutines.