README
ΒΆ
Vinculum
"The [vinculum is the] processing device at the core of every Borg vessel. It interconnects the minds of all the drones." -- Seven of Nine (In Voyager episode "Infinite Regress")
Vinculum is several things:
- A high-performance, feature-rich in-process EventBus for Go with MQTT-style topic patterns, and optional observability.
- An optional, simple, JSON-based protocol with a server implementation to expose the bus over WebSockets. This is a good way for a web application to automatically notify clients of updates based on actions taken by other clients, for example.
- A client implementation of the protocol
Note that this README was written almost entirely by Claude and it makes bold claims, but it did actually test the performance and got the claimed numbers on a 2021 M1 Max MacBook.
β¨ Features
π Core EventBus
- MQTT-style topics with wildcards (
+single-level,#multi-level) - Parameter extraction from topic patterns (
users/+userId/events) - Async & sync publishing with error handling
- Thread-safe with minimal locking design
- Graceful shutdown and lifecycle management
π Observability
- Optional OpenTelemetry integration (metrics + tracing)
- Standalone metrics provider (publishes to
$metricstopic) - Minimal overhead when observability disabled
- Comprehensive instrumentation (counters, histograms, gauges)
- Interface-based design for custom providers
π§ Developer Experience
- Simple API with intuitive interfaces
- Structured logging with zap integration
- Comprehensive test suite with 83%+ coverage
- Type-safe subscriber patterns
- Mock-friendly for testing
β‘ Performance
- Configurable buffering (1000 messages default) for burst handling
- Atomic operations for counters
- Lazy metric creation for efficiency
- Context propagation for distributed tracing
- Minimal allocations in hot paths
π WebSocket Components
Vinculum includes WebSocket client and server implementations for real-time web communication:
π‘ WebSocket Server
Expose your EventBus over WebSockets for real-time web applications:
- Real-time event streaming to web clients
- Bidirectional communication (subscribe + publish)
- Flexible authentication and authorization policies
- Built-in metrics and connection management
- Message transformations and filtering
π WebSocket Server Documentation
π WebSocket Client
Connect to Vinculum WebSocket servers from Go applications:
- Auto-reconnection with exponential backoff
- Subscription management and persistence
- Thread-safe operations
- Comprehensive error handling
- Builder pattern for easy configuration
π WebSocket Client Documentation
π Quick Start
package main
import (
"context"
"github.com/tsarna/vinculum/pkg/vinculum"
"github.com/tsarna/vinculum/pkg/vinculum/subutils"
"go.uber.org/zap"
)
func main() {
logger, _ := zap.NewProduction()
ctx := context.Background()
// Create and start EventBus using builder pattern
eventBus, err := vinculum.NewEventBus().
WithLogger(logger).
Build()
if err != nil {
log.Fatal(err)
}
eventBus.Start()
defer eventBus.Stop()
// Create subscriber (nil = standalone logging subscriber)
subscriber := subutils.NewNamedLoggingSubscriber(nil, logger, zap.InfoLevel, "MyService")
// Subscribe to topic pattern (context propagation enabled)
eventBus.Subscribe(ctx, subscriber, "users/+userId/events")
// Publish messages with context
eventBus.Publish(ctx, "users/123/events", "User logged in")
eventBus.PublishSync(ctx, "users/456/events", "User created account")
}
π API Reference
EventBus Interface
type EventBus interface {
Start() error
Stop() error
Subscribe(ctx context.Context, subscriber Subscriber, topic string) error
Unsubscribe(ctx context.Context, subscriber Subscriber, topic string) error
UnsubscribeAll(ctx context.Context, subscriber Subscriber) error
Publish(ctx context.Context, topic string, payload any) error // Async, fire-and-forget
PublishSync(ctx context.Context, topic string, payload any) error // Sync, waits for completion
}
Subscriber Interface
type Subscriber interface {
OnSubscribe(ctx context.Context, topic string) error
OnUnsubscribe(ctx context.Context, topic string) error
OnEvent(ctx context.Context, topic string, message any, fields map[string]string) error
}
Creating EventBus
Builder Pattern
// Basic EventBus
eventBus, err := vinculum.NewEventBus().
WithLogger(logger).
Build()
if err != nil {
return err
}
// With custom buffer size
eventBus, err := vinculum.NewEventBus().
WithLogger(logger).
WithBufferSize(2000).
Build()
if err != nil {
return err
}
// With observability
eventBus, err := vinculum.NewEventBus().
WithLogger(logger).
WithObservability(metricsProvider, tracingProvider).
WithServiceInfo("my-service", "v1.0.0").
WithBufferSize(1500).
Build()
if err != nil {
return err
}
// Step by step configuration
eventBus, err := vinculum.NewEventBus().
WithLogger(logger).
WithMetrics(metricsProvider).
WithTracing(tracingProvider).
WithBufferSize(500).
Build()
if err != nil {
return err
}
// Without logger (uses nop logger)
eventBus, err := vinculum.NewEventBus().
WithBufferSize(1000).
Build()
if err != nil {
return err
}
π― Topic Patterns
Wildcards
+- Single-level wildcard (sensors/+/temperature)#- Multi-level wildcard (logs/#)
Parameter Extraction
// Subscribe to pattern
eventBus.Subscribe(ctx, subscriber, "users/+userId/orders/+orderId")
// Publish message
eventBus.Publish(ctx, "users/123/orders/456", orderData)
// Subscriber receives:
// topic = "users/123/orders/456"
// fields = {"userId": "123", "orderId": "456"}
π Observability Options
1. OpenTelemetry Integration
import "github.com/tsarna/vinculum/pkg/vinculum/otel"
provider := otel.NewProvider("my-service", "v1.0.0")
eventBus, err := vinculum.NewEventBus().
WithLogger(logger).
WithObservability(provider, provider).
WithServiceInfo("my-service", "v1.0.0").
Build()
if err != nil {
return err
}
2. Standalone Metrics (Zero Dependencies)
// Self-contained metrics via EventBus
metricsProvider := vinculum.NewStandaloneMetricsProvider(eventBus, &vinculum.StandaloneMetricsConfig{
Interval: 30 * time.Second, // Publish every 30s
MetricsTopic: "$metrics", // Topic for metrics
ServiceName: "my-service",
})
metricsProvider.Start()
defer metricsProvider.Stop()
// Subscribe to metrics
eventBus.Subscribe(ctx, metricsCollector, "$metrics")
Metrics JSON Format
{
"timestamp": "2025-08-28T23:02:30.773505-04:00",
"service_name": "my-service",
"counters": {
"eventbus_messages_published_total": 150,
"eventbus_messages_published_sync_total": 25,
"eventbus_subscriptions_total": 12,
"eventbus_errors_total": 0
},
"histograms": {
"eventbus_publish_duration_seconds": [0.001, 0.002, 0.001]
},
"gauges": {
"eventbus_active_subscribers": 8
}
}
π§ͺ Testing
Built-in Test Utilities
// Mock subscriber for testing
mockSub := &vinculum.MockSubscriber{}
eventBus.Subscribe(ctx, mockSub, "test/+param")
eventBus.Publish(ctx, "test/value", "message")
// Verify events
events := mockSub.GetEvents()
assert.Equal(t, 1, len(events))
assert.Equal(t, "test/value", events[0].Topic)
assert.Equal(t, "value", events[0].Fields["param"])
Logging Subscriber for Debugging
// Logs all events with structured data (standalone mode)
debugSub := subutils.NewNamedLoggingSubscriber(nil, logger, zap.DebugLevel, "Debug")
eventBus.Subscribe(ctx, debugSub, "#") // Subscribe to everything
// Or wrap another subscriber to add logging
// wrappedSub := subutils.NewNamedLoggingSubscriber(mySubscriber, logger, zap.DebugLevel, "Debug")
ποΈ Architecture
Design Principles
- Interface-based - Easy to mock and extend
- Dependency inversion - Core doesn't depend on observability
- Zero-cost abstractions - No overhead when features not used
- Thread-safe - Safe for concurrent use
- Graceful degradation - Continues working on errors
Performance Characteristics
- ~110ns per publish operation (no observability, 0 allocations)
- ~529ns per publish operation (with OpenTelemetry observability, 12 allocations)
- ~180ns per publish operation (with standalone metrics, 1 allocation)
- ~706ns per PublishSync operation (waits for completion, 3 allocations)
- 4.6+ million messages/second throughput capability
- Zero allocations in async publish hot path (no observability)
- Optimized hot path with minimal overhead
- Configurable buffering (default: 1000 messages) with message dropping for backpressure
π Dependencies
Core (Required)
go.uber.org/zap- Structured logginggithub.com/amir-yaghoubi/mqttpattern- Topic pattern matching
Optional (Only when used)
go.opentelemetry.io/otel- OpenTelemetry integration
βοΈ Configuration
Buffer Size Configuration
Control the internal channel buffer size to handle burst traffic:
// Custom buffer size for high-throughput scenarios
eventBus, err := vinculum.NewEventBus().
WithLogger(logger).
WithBufferSize(5000). // Default is 1000
Build()
if err != nil {
return err
}
Buffer Size Guidelines:
- Default (1000): Good for most applications (~0.2ms burst capacity)
- Small (100-500): Memory-constrained environments
- Large (2000+): High-throughput, burst-heavy workloads
- Very Large (10000+): Extreme burst scenarios (consider batching instead)
π Examples
Basic Pub/Sub
subscriber := &MySubscriber{}
eventBus.Subscribe(ctx, subscriber, "notifications/+type")
eventBus.Publish(ctx, "notifications/email", emailData)
Error Handling
err := eventBus.PublishSync(ctx, "critical/operation", data)
if err != nil {
log.Error("Critical operation failed", zap.Error(err))
}
Graceful Shutdown
// Start EventBus
eventBus.Start()
// Handle shutdown signal
c := make(chan os.Signal, 1)
signal.Notify(c, os.Interrupt, syscall.SIGTERM)
<-c
// Graceful shutdown
eventBus.Stop() // Waits for in-flight messages
π Protocol
Both components implement the Vinculum WebSocket Protocol:
- JSON-based with compact message format
- MQTT-style topic patterns
- Request/response correlation
- Error handling and acknowledgments
π― Use Cases
- Microservice communication within a process
- Event-driven architectures
- Decoupled component communication
- Real-time data processing pipelines
- Plugin systems with event coordination
- Application telemetry and monitoring
- Real-time web applications with WebSocket integration
π License
MIT License - see LICENSE file for details.
Vinculum (Latin: "bond" or "link") - connecting your application components with reliable, observable messaging.
Directories
ΒΆ
| Path | Synopsis |
|---|---|
|
pkg
|
|
|
vinculum/otel
Package otel provides OpenTelemetry implementations for vinculum observability interfaces.
|
Package otel provides OpenTelemetry implementations for vinculum observability interfaces. |