bus

package module
v0.14.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Apr 24, 2026 License: BSD-2-Clause Imports: 13 Imported by: 13

README

Vinculum Bus

"The [vinculum is the] processing device at the core of every Borg vessel. It interconnects the minds of all the drones." -- Seven of Nine (In Voyager episode "Infinite Regress")

vinculum-bus is the core event bus functionality of Vinculum. It's a high-performance, feature-rich in-process EventBus for Go with MQTT-style topic patterns and optional observability. It does not depend on any other Vinculum projects.

✨ Features

🚀 Core EventBus
  • MQTT-style topics with wildcards (+ single-level, # multi-level)
  • Parameter extraction from topic patterns (users/+userId/events)
  • Async & sync publishing with error handling
  • Thread-safe with minimal locking design
  • Graceful shutdown and lifecycle management
📊 Observability
  • Optional OpenTelemetry integration (metrics + tracing)
  • Standalone metrics provider (publishes to $metrics topic)
  • Minimal overhead when observability disabled
  • Comprehensive instrumentation (counters, histograms, gauges)
  • Interface-based design for custom providers
🔧 Developer Experience
  • Simple API with intuitive interfaces
  • Structured logging with zap integration
  • Comprehensive test suite with 83%+ coverage
  • Type-safe subscriber patterns
  • Mock-friendly for testing
Performance
  • Configurable buffering (1000 messages default) for burst handling
  • Atomic operations for counters
  • Lazy metric creation for efficiency
  • Context propagation for distributed tracing
  • Minimal allocations in hot paths

🚀 Quick Start

package main

import (
    "context"
    "github.com/tsarna/vinculum-bus"
    "github.com/tsarna/vinculum-bus/subutils"
    "go.uber.org/zap"
)

func main() {
    logger, _ := zap.NewProduction()
    ctx := context.Background()
    
    // Create and start EventBus using builder pattern
    eventBus, err := bus.NewEventBus().
        WithLogger(logger).
        Build()
    if err != nil {
        log.Fatal(err)
    }
    eventBus.Start()
    defer eventBus.Stop()
    
    // Create subscriber (nil = standalone logging subscriber)
    subscriber := subutils.NewNamedLoggingSubscriber(nil, logger, zap.InfoLevel, "MyService")
    
    // Subscribe to topic pattern (context propagation enabled)
    eventBus.Subscribe(ctx, subscriber, "users/+userId/events")
    
    // Publish messages with context
    eventBus.Publish(ctx, "users/123/events", "User logged in")
    eventBus.PublishSync(ctx, "users/456/events", "User created account")
}

📚 API Reference

EventBus Interface
type EventBus interface {
    Start() error
    Stop() error
    
    Subscribe(ctx context.Context, topic string, subscriber Subscriber) error
    Unsubscribe(ctx context.Context, topic string, subscriber Subscriber) error
    UnsubscribeAll(ctx context.Context, subscriber Subscriber) error
    
    Publish(ctx context.Context, topic string, payload any) error      // Async, fire-and-forget
    PublishSync(ctx context.Context, topic string, payload any) error  // Sync, waits for completion
}
Subscriber Interface
type Subscriber interface {
    OnSubscribe(ctx context.Context, topic string) error
    OnUnsubscribe(ctx context.Context, topic string) error
    OnEvent(ctx context.Context, topic string, message any, fields map[string]string) error
}
Creating EventBus
Builder Pattern
// Basic EventBus
eventBus, err := bus.NewEventBus().
    WithLogger(logger).
    Build()
if err != nil {
    return err
}

// With custom buffer size
eventBus, err := bus.NewEventBus().
    WithLogger(logger).
    WithBufferSize(2000).
    Build()
if err != nil {
    return err
}

// With observability
eventBus, err := bus.NewEventBus().
    WithLogger(logger).
    WithMeterProvider(meterProvider).
    WithTracerProvider(tracerProvider).
    WithServiceInfo("my-service", "v1.0.0").
    WithBufferSize(1500).
    Build()
if err != nil {
    return err
}

// Without logger (uses nop logger)
eventBus, err := bus.NewEventBus().
    WithBufferSize(1000).
    Build()
if err != nil {
    return err
}

🎯 Topic Patterns

Wildcards
  • + - Single-level wildcard (sensors/+/temperature)
  • # - Multi-level wildcard (logs/#)
Parameter Extraction
// Subscribe to pattern
eventBus.Subscribe(ctx, subscriber, "users/+userId/orders/+orderId")

// Publish message
eventBus.Publish(ctx, "users/123/orders/456", orderData)

// Subscriber receives:
// topic = "users/123/orders/456"
// fields = {"userId": "123", "orderId": "456"}
Reserved $-prefixed topics

Topics beginning with $ are reserved for server/system use (e.g. $metrics for the standalone metrics provider). Following MQTT 5.0 §4.7.2, a topic filter starting with a wildcard character (+ or #) will not match a topic beginning with $. To receive these topics, subscribe either exactly ("$metrics") or with a pattern whose first segment is $-prefixed ("$sys/#").

Topic pattern matching across the bus and its bridges is consolidated in the topicmatch sub-package, which wraps mqttpattern and enforces this rule. Direct callers outside the bus should prefer topicmatch over mqttpattern.

📊 Observability

OpenTelemetry Metrics

The EventBus accepts a standard OTel metric.MeterProvider. Metrics follow OTel semantic conventions (messaging.client.* where applicable).

eventBus, err := bus.NewEventBus().
    WithLogger(logger).
    WithMeterProvider(meterProvider).
    WithServiceInfo("my-service", "v1.0.0").
    Build()
Standalone Metrics (publish to bus)
import "github.com/tsarna/vinculum-bus/o11y"

// Creates an sdkmetric.MeterProvider that periodically exports metrics to a bus topic
mp, exporter := o11y.NewStandaloneMeterProvider(eventBus, &o11y.StandaloneMetricsConfig{
    Interval:     30 * time.Second,  // Publish every 30s
    MetricsTopic: "$metrics",        // Topic for metrics
    ServiceName:  "my-service",
})
defer mp.Shutdown(ctx)

// Use the MeterProvider with the EventBus
observableEventBus, _ := bus.NewEventBus().
    WithMeterProvider(mp).
    Build()

// Subscribe to metrics snapshots
eventBus.Subscribe(ctx, metricsCollector, "$metrics")
Metrics Snapshot Format
{
  "timestamp": "2025-08-28T23:02:30.773505-04:00",
  "service_name": "my-service",
  "counters": {
    "messaging.client.sent.messages": 175,
    "eventbus.subscriptions": 12,
    "messaging.client.errors": 0
  },
  "histograms": {
    "messaging.client.operation.duration": {
      "count": 25,
      "sum": 0.045,
      "bounds": [0.005, 0.01, 0.025, 0.05, 0.075, 0.1, 0.25, 0.5, 0.75, 1, 2.5, 5, 7.5, 10],
      "bucket_counts": [10, 8, 5, 2, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0]
    }
  },
  "gauges": {
    "eventbus.active_subscribers": 8
  }
}

🧪 Testing

Built-in Test Utilities
// Mock subscriber for testing
mockSub := &bus.MockSubscriber{}
eventBus.Subscribe(ctx, mockSub, "test/+param")

eventBus.Publish(ctx, "test/value", "message")

// Verify events
events := mockSub.GetEvents()
assert.Equal(t, 1, len(events))
assert.Equal(t, "test/value", events[0].Topic)
assert.Equal(t, "value", events[0].Fields["param"])
Logging Subscriber for Debugging
// Logs all events with structured data (standalone mode)
debugSub := subutils.NewNamedLoggingSubscriber(nil, logger, zap.DebugLevel, "Debug")
eventBus.Subscribe(ctx, debugSub, "#") // Subscribe to everything

// Or wrap another subscriber to add logging
// wrappedSub := subutils.NewNamedLoggingSubscriber(mySubscriber, logger, zap.DebugLevel, "Debug")

🏗️ Architecture

Design Principles
  • Interface-based - Easy to mock and extend
  • Dependency inversion - Core doesn't depend on observability
  • Zero-cost abstractions - No overhead when features not used
  • Thread-safe - Safe for concurrent use
  • Graceful degradation - Continues working on errors
Performance Characteristics

Note that this README was written almost entirely by Claude and it makes bold claims, but it did actually test the performance and got the claimed numbers on a 2021 M1 Max MacBook.

  • ~110ns per publish operation (no observability, 0 allocations)
  • ~529ns per publish operation (with OpenTelemetry observability, 12 allocations)
  • ~180ns per publish operation (with standalone metrics, 1 allocation)
  • ~706ns per PublishSync operation (waits for completion, 3 allocations)
  • 4.6+ million messages/second throughput capability
  • Zero allocations in async publish hot path (no observability)
  • Optimized hot path with minimal overhead
  • Configurable buffering (default: 1000 messages) with message dropping for backpressure

⚙️ Configuration

Buffer Size Configuration

Control the internal channel buffer size to handle burst traffic:

// Custom buffer size for high-throughput scenarios
eventBus, err := bus.NewEventBus().
    WithLogger(logger).
    WithBufferSize(5000). // Default is 1000
    Build()
if err != nil {
    return err
}

Buffer Size Guidelines:

  • Default (1000): Good for most applications (~0.2ms burst capacity)
  • Small (100-500): Memory-constrained environments
  • Large (2000+): High-throughput, burst-heavy workloads
  • Very Large (10000+): Extreme burst scenarios (consider batching instead)

📖 Examples

Basic Pub/Sub
subscriber := &MySubscriber{}
eventBus.Subscribe(ctx, subscriber, "notifications/+type")
eventBus.Publish(ctx, "notifications/email", emailData)
Error Handling
err := eventBus.PublishSync(ctx, "critical/operation", data)
if err != nil {
    log.Error("Critical operation failed", zap.Error(err))
}
Graceful Shutdown
// Start EventBus
eventBus.Start()

// Handle shutdown signal
c := make(chan os.Signal, 1)
signal.Notify(c, os.Interrupt, syscall.SIGTERM)
<-c

// Graceful shutdown
eventBus.Stop() // Waits for in-flight messages

🎯 Use Cases

  • Microservice communication within a process
  • Event-driven architectures
  • Decoupled component communication
  • Real-time data processing pipelines
  • Plugin systems with event coordination
  • Application telemetry and monitoring

🔗 Dependencies

Core (Required)
  • go.uber.org/zap - Structured logging
  • github.com/amir-yaghoubi/mqttpattern - Topic pattern matching (wrapped by the topicmatch sub-package to enforce MQTT $-topic rules)
Observability (Optional)
  • github.com/tsarna/vinculum-bus/o11y - Observability interfaces and standalone metrics
  • github.com/tsarna/vinculum-bus/otel - OpenTelemetry integration
  • go.opentelemetry.io/otel - OpenTelemetry SDK (when using otel package)

Documentation

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

This section is empty.

Types

type AutoReconnector

type AutoReconnector struct {
	*SubscriptionTracker
	// contains filtered or unexported fields
}

AutoReconnector is a ClientMonitor that automatically reconnects on error disconnects with exponential backoff and resubscribes to previous topics.

Features:

  • Automatic reconnection on error disconnects (but not clean disconnects)
  • Exponential backoff with configurable parameters
  • Automatic resubscription to previous topic patterns after reconnect
  • Configurable maximum retry attempts
  • Thread-safe operation

Example usage:

// Default configuration (with no-op logger)
reconnector := bus.NewAutoReconnector().Build()

// Custom configuration with logger
logger, _ := zap.NewProduction()
reconnector := bus.NewAutoReconnector().
	WithInitialDelay(500 * time.Millisecond).
	WithMaxDelay(10 * time.Second).
	WithBackoffFactor(1.5).
	WithMaxRetries(5).
	WithEnabled(true).
	WithLogger(logger).
	Build()

client, err := client.NewClient().
	WithURL("ws://localhost:8080/ws").
	WithSubscriber(subscriber).
	WithMonitor(reconnector).
	Build()

// The client will now automatically reconnect on connection failures
// and restore all previous subscriptions

func (*AutoReconnector) GetLastReconnectTime

func (a *AutoReconnector) GetLastReconnectTime() time.Time

GetLastReconnectTime returns when the last reconnection attempt was made.

func (*AutoReconnector) GetReconnectCount

func (a *AutoReconnector) GetReconnectCount() int

GetReconnectCount returns the number of reconnection attempts made.

func (*AutoReconnector) IsEnabled

func (a *AutoReconnector) IsEnabled() bool

IsEnabled returns whether automatic reconnection is currently enabled.

func (*AutoReconnector) OnConnect

func (a *AutoReconnector) OnConnect(ctx context.Context, client Client)

OnConnect implements ClientMonitor.OnConnect and delegates to SubscriptionTracker.

func (*AutoReconnector) OnDisconnect

func (a *AutoReconnector) OnDisconnect(ctx context.Context, client Client, err error)

OnDisconnect implements ClientMonitor.OnDisconnect and triggers reconnection on error disconnects.

func (*AutoReconnector) OnSubscribe

func (a *AutoReconnector) OnSubscribe(ctx context.Context, client Client, topicPattern string)

OnSubscribe implements ClientMonitor.OnSubscribe and delegates to SubscriptionTracker.

func (*AutoReconnector) OnUnsubscribe

func (a *AutoReconnector) OnUnsubscribe(ctx context.Context, client Client, topicPattern string)

OnUnsubscribe implements ClientMonitor.OnUnsubscribe and delegates to SubscriptionTracker.

func (*AutoReconnector) OnUnsubscribeAll

func (a *AutoReconnector) OnUnsubscribeAll(ctx context.Context, client Client)

OnUnsubscribeAll implements ClientMonitor.OnUnsubscribeAll and delegates to SubscriptionTracker.

func (*AutoReconnector) SetEnabled

func (a *AutoReconnector) SetEnabled(enabled bool)

SetEnabled enables or disables automatic reconnection.

type AutoReconnectorBuilder

type AutoReconnectorBuilder struct {
	// contains filtered or unexported fields
}

AutoReconnectorBuilder provides a fluent interface for configuring AutoReconnector.

func NewAutoReconnector

func NewAutoReconnector() *AutoReconnectorBuilder

NewAutoReconnector creates a new AutoReconnectorBuilder with default configuration.

func (*AutoReconnectorBuilder) Build

Build creates the configured AutoReconnector.

func (*AutoReconnectorBuilder) WithBackoffFactor

func (b *AutoReconnectorBuilder) WithBackoffFactor(factor float64) *AutoReconnectorBuilder

WithBackoffFactor sets the multiplier for exponential backoff.

func (*AutoReconnectorBuilder) WithEnabled

func (b *AutoReconnectorBuilder) WithEnabled(enabled bool) *AutoReconnectorBuilder

WithEnabled sets whether automatic reconnection is initially enabled.

func (*AutoReconnectorBuilder) WithInitialDelay

func (b *AutoReconnectorBuilder) WithInitialDelay(delay time.Duration) *AutoReconnectorBuilder

WithInitialDelay sets the initial delay before the first reconnection attempt.

func (*AutoReconnectorBuilder) WithLogger

func (b *AutoReconnectorBuilder) WithLogger(logger *zap.Logger) *AutoReconnectorBuilder

WithLogger sets the logger for reconnection events.

func (*AutoReconnectorBuilder) WithMaxDelay

WithMaxDelay sets the maximum delay between reconnection attempts.

func (*AutoReconnectorBuilder) WithMaxRetries

func (b *AutoReconnectorBuilder) WithMaxRetries(retries int) *AutoReconnectorBuilder

WithMaxRetries sets the maximum number of reconnection attempts. Set to -1 for unlimited retries.

type BaseSubscriber

type BaseSubscriber struct {
}

func (*BaseSubscriber) OnEvent

func (b *BaseSubscriber) OnEvent(ctx context.Context, topic string, message any, fields map[string]string) error

func (*BaseSubscriber) OnSubscribe

func (b *BaseSubscriber) OnSubscribe(ctx context.Context, topic string) error

func (*BaseSubscriber) OnUnsubscribe

func (b *BaseSubscriber) OnUnsubscribe(ctx context.Context, topic string) error

func (*BaseSubscriber) PassThrough

func (b *BaseSubscriber) PassThrough(msg EventBusMessage) error

type Client

type Client interface {
	Subscriber

	Connect(ctx context.Context) error
	Disconnect() error

	Subscribe(ctx context.Context, topicPattern string) error
	Unsubscribe(ctx context.Context, topicPattern string) error
	UnsubscribeAll(ctx context.Context) error

	Publish(ctx context.Context, topic string, payload any) error
	PublishSync(ctx context.Context, topic string, payload any) error
}

type ClientMonitor

type ClientMonitor interface {
	OnConnect(ctx context.Context, client Client)
	OnDisconnect(ctx context.Context, client Client, err error)
	OnSubscribe(ctx context.Context, client Client, topic string)
	OnUnsubscribe(ctx context.Context, client Client, topic string)
	OnUnsubscribeAll(ctx context.Context, client Client)
}

type EventBus

type EventBus interface {
	Subscriber // An EventBus can be subscribed to other event buses

	Start() error
	Stop() error

	Subscribe(ctx context.Context, topic string, subscriber Subscriber) error
	SubscribeFunc(ctx context.Context, topic string, receiver EventReceiver) (Subscriber, error)
	Unsubscribe(ctx context.Context, topic string, subscriber Subscriber) error
	UnsubscribeAll(ctx context.Context, subscriber Subscriber) error

	Publish(ctx context.Context, topic string, payload any) error
	PublishSync(ctx context.Context, topic string, payload any) error
}

type EventBusBuilder

type EventBusBuilder struct {
	// contains filtered or unexported fields
}

EventBusBuilder provides a fluent interface for creating EventBus instances

func NewEventBus

func NewEventBus() *EventBusBuilder

NewEventBus creates a new EventBusBuilder

func (*EventBusBuilder) Build

func (b *EventBusBuilder) Build() (EventBus, error)

Build creates and returns the EventBus instance, returning an error if configuration is invalid

func (*EventBusBuilder) IsValid

func (b *EventBusBuilder) IsValid() error

IsValid validates the builder configuration and returns an error if invalid

func (*EventBusBuilder) WithBufferSize

func (b *EventBusBuilder) WithBufferSize(size int) *EventBusBuilder

WithBufferSize sets the channel buffer size for the EventBus

func (*EventBusBuilder) WithLogger

func (b *EventBusBuilder) WithLogger(logger *zap.Logger) *EventBusBuilder

WithLogger sets the logger for the EventBus

func (*EventBusBuilder) WithMeterProvider added in v0.11.0

func (b *EventBusBuilder) WithMeterProvider(provider metric.MeterProvider) *EventBusBuilder

WithMeterProvider sets the OTel MeterProvider for the EventBus

func (*EventBusBuilder) WithName

func (b *EventBusBuilder) WithName(name string) *EventBusBuilder

WithName sets the name for the EventBus

func (*EventBusBuilder) WithServiceInfo

func (b *EventBusBuilder) WithServiceInfo(name, version string) *EventBusBuilder

WithServiceInfo sets service name and version for observability

func (*EventBusBuilder) WithTracerProvider added in v0.10.0

func (b *EventBusBuilder) WithTracerProvider(provider trace.TracerProvider) *EventBusBuilder

WithTracerProvider sets the OTel TracerProvider for the EventBus

type EventBusMessage

type EventBusMessage struct {
	Ctx     context.Context
	MsgType MessageType
	Topic   string
	Payload any
	Fields  map[string]string
}

EventBusMessage represents a message in the event bus with its context and metadata.

Fields carries subscriber-local delivery metadata (e.g. topic pattern extractions, enrichment added by transforms on the final hop). It is intentionally stripped when a message is published to the bus: these fields do not propagate across hops. The bus publish/subscribe paths construct EventBusMessage values without populating Fields; Fields is set by the delivery path (subscribers, transforms, or the caller of transform.ApplyTransforms) to carry per-delivery context.

type EventReceiver added in v0.9.0

type EventReceiver func(ctx context.Context, topic string, message any, fields map[string]string) error

type MessageType

type MessageType int

MessageType represents the type of message in the event bus

const (
	MessageTypeEvent MessageType = iota
	MessageTypeSubscribe
	MessageTypeSubscribeWithExtraction
	MessageTypeUnsubscribe
	MessageTypeUnsubscribeAll
	MessageTypeEventSync

	MessageTypeOnSubscribe
	MessageTypeOnUnsubscribe
	MessageTypePassThrough
	MessageTypeTick
)

type Subscriber

type Subscriber interface {
	OnSubscribe(ctx context.Context, topic string) error
	OnUnsubscribe(ctx context.Context, topic string) error
	OnEvent(ctx context.Context, topic string, message any, fields map[string]string) error

	// PassThrough is used to pass the message to the next subscriber in the chain, eg to handle
	// cases like request responses.
	PassThrough(msg EventBusMessage) error
}

Note: Unless a subscriber is suubscribed to multiple busses and/or async queueing wrappers, it will only be called from one thread, so it doesn't need to worry about concurrent calls.

func NewEventReceiver added in v0.9.0

func NewEventReceiver(receiver EventReceiver) Subscriber

type SubscriptionTracker

type SubscriptionTracker struct {
	// contains filtered or unexported fields
}

SubscriptionTracker is a sample ClientMonitor implementation that tracks client connection state and active subscriptions.

Example usage:

tracker := &bus.SubscriptionTracker{}
client, err := client.NewClient().
	WithURL("ws://localhost:8080/ws").
	WithSubscriber(subscriber).
	WithMonitor(tracker).
	Build()

// Later, check connection and subscription status
if tracker.IsConnected() {
	subscriptions := tracker.GetSubscriptions()
	fmt.Printf("Connected with %d active subscriptions: %v\n",
		len(subscriptions), subscriptions)
}

func NewSubscriptionTracker

func NewSubscriptionTracker() *SubscriptionTracker

NewSubscriptionTracker creates a new subscription tracker.

func (*SubscriptionTracker) GetConnectionTime

func (s *SubscriptionTracker) GetConnectionTime() time.Time

GetConnectionTime returns when the client last connected. Returns zero time if never connected.

func (*SubscriptionTracker) GetDisconnectionTime

func (s *SubscriptionTracker) GetDisconnectionTime() time.Time

GetDisconnectionTime returns when the client last disconnected. Returns zero time if never disconnected.

func (*SubscriptionTracker) GetLastError

func (s *SubscriptionTracker) GetLastError() error

GetLastError returns the error from the last disconnect, if any. Returns nil for graceful disconnects.

func (*SubscriptionTracker) GetSubscriptionCount

func (s *SubscriptionTracker) GetSubscriptionCount() int

GetSubscriptionCount returns the number of active subscriptions.

func (*SubscriptionTracker) GetSubscriptionTime

func (s *SubscriptionTracker) GetSubscriptionTime(topicPattern string) time.Time

GetSubscriptionTime returns when the client subscribed to the given topic. Returns zero time if not subscribed to the topic.

func (*SubscriptionTracker) GetSubscriptionTopics

func (s *SubscriptionTracker) GetSubscriptionTopics() []string

GetSubscriptionTopics returns a slice of all active subscription topics.

func (*SubscriptionTracker) GetSubscriptions

func (s *SubscriptionTracker) GetSubscriptions() map[string]time.Time

GetSubscriptions returns a copy of all active subscriptions with their subscription times.

func (*SubscriptionTracker) IsConnected

func (s *SubscriptionTracker) IsConnected() bool

IsConnected returns true if the client is currently connected.

func (*SubscriptionTracker) IsSubscribedTo

func (s *SubscriptionTracker) IsSubscribedTo(topicPattern string) bool

IsSubscribedTo returns true if the client is subscribed to the given topic pattern.

func (*SubscriptionTracker) OnConnect

func (s *SubscriptionTracker) OnConnect(ctx context.Context, client Client)

OnConnect implements ClientMonitor.OnConnect

func (*SubscriptionTracker) OnDisconnect

func (s *SubscriptionTracker) OnDisconnect(ctx context.Context, client Client, err error)

OnDisconnect implements ClientMonitor.OnDisconnect

func (*SubscriptionTracker) OnSubscribe

func (s *SubscriptionTracker) OnSubscribe(ctx context.Context, client Client, topicPattern string)

OnSubscribe implements ClientMonitor.OnSubscribe

func (*SubscriptionTracker) OnUnsubscribe

func (s *SubscriptionTracker) OnUnsubscribe(ctx context.Context, client Client, topicPattern string)

OnUnsubscribe implements ClientMonitor.OnUnsubscribe

func (*SubscriptionTracker) OnUnsubscribeAll

func (s *SubscriptionTracker) OnUnsubscribeAll(ctx context.Context, client Client)

OnUnsubscribeAll implements ClientMonitor.OnUnsubscribeAll

Directories

Path Synopsis
Package topicmatch wraps github.com/amir-yaghoubi/mqttpattern to enforce the MQTT 5.0 §4.7.2 rule: a topic filter starting with a wildcard character (+ or #) MUST NOT match a topic name beginning with $.
Package topicmatch wraps github.com/amir-yaghoubi/mqttpattern to enforce the MQTT 5.0 §4.7.2 rule: a topic filter starting with a wildcard character (+ or #) MUST NOT match a topic name beginning with $.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL