vinculum

module
v0.6.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Sep 7, 2025 License: BSD-2-Clause

README ΒΆ

Vinculum

"The [vinculum is the] processing device at the core of every Borg vessel. It interconnects the minds of all the drones." -- Seven of Nine (In Voyager episode "Infinite Regress")

Vinculum is several things:

  • A high-performance, feature-rich in-process EventBus for Go with MQTT-style topic patterns, and optional observability.
  • An optional, simple, JSON-based protocol with a server implementation to expose the bus over WebSockets. This is a good way for a web application to automatically notify clients of updates based on actions taken by other clients, for example.
  • A client implementation of the protocol

Note that this README was written almost entirely by Claude and it makes bold claims, but it did actually test the performance and got the claimed numbers on a 2021 M1 Max MacBook.

✨ Features

πŸš€ Core EventBus
  • MQTT-style topics with wildcards (+ single-level, # multi-level)
  • Parameter extraction from topic patterns (users/+userId/events)
  • Async & sync publishing with error handling
  • Thread-safe with minimal locking design
  • Graceful shutdown and lifecycle management
πŸ“Š Observability
  • Optional OpenTelemetry integration (metrics + tracing)
  • Standalone metrics provider (publishes to $metrics topic)
  • Minimal overhead when observability disabled
  • Comprehensive instrumentation (counters, histograms, gauges)
  • Interface-based design for custom providers
πŸ”§ Developer Experience
  • Simple API with intuitive interfaces
  • Structured logging with zap integration
  • Comprehensive test suite with 83%+ coverage
  • Type-safe subscriber patterns
  • Mock-friendly for testing
⚑ Performance
  • Configurable buffering (1000 messages default) for burst handling
  • Atomic operations for counters
  • Lazy metric creation for efficiency
  • Context propagation for distributed tracing
  • Minimal allocations in hot paths

🌐 WebSocket Components

Vinculum includes WebSocket client and server implementations for real-time web communication:

πŸ“‘ WebSocket Server

Expose your EventBus over WebSockets for real-time web applications:

  • Real-time event streaming to web clients
  • Bidirectional communication (subscribe + publish)
  • Flexible authentication and authorization policies
  • Built-in metrics and connection management
  • Message transformations and filtering

πŸ“– WebSocket Server Documentation

πŸ”Œ WebSocket Client

Connect to Vinculum WebSocket servers from Go applications:

  • Auto-reconnection with exponential backoff
  • Subscription management and persistence
  • Thread-safe operations
  • Comprehensive error handling
  • Builder pattern for easy configuration

πŸ“– WebSocket Client Documentation

πŸš€ Quick Start

package main

import (
    "context"
    "github.com/tsarna/vinculum/pkg/vinculum"
    "github.com/tsarna/vinculum/pkg/vinculum/subutils"
    "go.uber.org/zap"
)

func main() {
    logger, _ := zap.NewProduction()
    ctx := context.Background()
    
    // Create and start EventBus using builder pattern
    eventBus, err := vinculum.NewEventBus().
        WithLogger(logger).
        Build()
    if err != nil {
        log.Fatal(err)
    }
    eventBus.Start()
    defer eventBus.Stop()
    
    // Create subscriber (nil = standalone logging subscriber)
    subscriber := subutils.NewNamedLoggingSubscriber(nil, logger, zap.InfoLevel, "MyService")
    
    // Subscribe to topic pattern (context propagation enabled)
    eventBus.Subscribe(ctx, subscriber, "users/+userId/events")
    
    // Publish messages with context
    eventBus.Publish(ctx, "users/123/events", "User logged in")
    eventBus.PublishSync(ctx, "users/456/events", "User created account")
}

πŸ“š API Reference

EventBus Interface
type EventBus interface {
    Start() error
    Stop() error
    
    Subscribe(ctx context.Context, subscriber Subscriber, topic string) error
    Unsubscribe(ctx context.Context, subscriber Subscriber, topic string) error
    UnsubscribeAll(ctx context.Context, subscriber Subscriber) error
    
    Publish(ctx context.Context, topic string, payload any) error      // Async, fire-and-forget
    PublishSync(ctx context.Context, topic string, payload any) error  // Sync, waits for completion
}
Subscriber Interface
type Subscriber interface {
    OnSubscribe(ctx context.Context, topic string) error
    OnUnsubscribe(ctx context.Context, topic string) error
    OnEvent(ctx context.Context, topic string, message any, fields map[string]string) error
}
Creating EventBus
Builder Pattern
// Basic EventBus
eventBus, err := vinculum.NewEventBus().
    WithLogger(logger).
    Build()
if err != nil {
    return err
}

// With custom buffer size
eventBus, err := vinculum.NewEventBus().
    WithLogger(logger).
    WithBufferSize(2000).
    Build()
if err != nil {
    return err
}

// With observability
eventBus, err := vinculum.NewEventBus().
    WithLogger(logger).
    WithObservability(metricsProvider, tracingProvider).
    WithServiceInfo("my-service", "v1.0.0").
    WithBufferSize(1500).
    Build()
if err != nil {
    return err
}

// Step by step configuration
eventBus, err := vinculum.NewEventBus().
    WithLogger(logger).
    WithMetrics(metricsProvider).
    WithTracing(tracingProvider).
    WithBufferSize(500).
    Build()
if err != nil {
    return err
}

// Without logger (uses nop logger)
eventBus, err := vinculum.NewEventBus().
    WithBufferSize(1000).
    Build()
if err != nil {
    return err
}

🎯 Topic Patterns

Wildcards
  • + - Single-level wildcard (sensors/+/temperature)
  • # - Multi-level wildcard (logs/#)
Parameter Extraction
// Subscribe to pattern
eventBus.Subscribe(ctx, subscriber, "users/+userId/orders/+orderId")

// Publish message
eventBus.Publish(ctx, "users/123/orders/456", orderData)

// Subscriber receives:
// topic = "users/123/orders/456"
// fields = {"userId": "123", "orderId": "456"}

πŸ“Š Observability Options

1. OpenTelemetry Integration
import "github.com/tsarna/vinculum/pkg/vinculum/otel"

provider := otel.NewProvider("my-service", "v1.0.0")
eventBus, err := vinculum.NewEventBus().
    WithLogger(logger).
    WithObservability(provider, provider).
    WithServiceInfo("my-service", "v1.0.0").
    Build()
if err != nil {
    return err
}
2. Standalone Metrics (Zero Dependencies)
// Self-contained metrics via EventBus
metricsProvider := vinculum.NewStandaloneMetricsProvider(eventBus, &vinculum.StandaloneMetricsConfig{
    Interval:     30 * time.Second,  // Publish every 30s
    MetricsTopic: "$metrics",        // Topic for metrics
    ServiceName:  "my-service",
})

metricsProvider.Start()
defer metricsProvider.Stop()

// Subscribe to metrics
eventBus.Subscribe(ctx, metricsCollector, "$metrics")
Metrics JSON Format
{
  "timestamp": "2025-08-28T23:02:30.773505-04:00",
  "service_name": "my-service",
  "counters": {
    "eventbus_messages_published_total": 150,
    "eventbus_messages_published_sync_total": 25,
    "eventbus_subscriptions_total": 12,
    "eventbus_errors_total": 0
  },
  "histograms": {
    "eventbus_publish_duration_seconds": [0.001, 0.002, 0.001]
  },
  "gauges": {
    "eventbus_active_subscribers": 8
  }
}

πŸ§ͺ Testing

Built-in Test Utilities
// Mock subscriber for testing
mockSub := &vinculum.MockSubscriber{}
eventBus.Subscribe(ctx, mockSub, "test/+param")

eventBus.Publish(ctx, "test/value", "message")

// Verify events
events := mockSub.GetEvents()
assert.Equal(t, 1, len(events))
assert.Equal(t, "test/value", events[0].Topic)
assert.Equal(t, "value", events[0].Fields["param"])
Logging Subscriber for Debugging
// Logs all events with structured data (standalone mode)
debugSub := subutils.NewNamedLoggingSubscriber(nil, logger, zap.DebugLevel, "Debug")
eventBus.Subscribe(ctx, debugSub, "#") // Subscribe to everything

// Or wrap another subscriber to add logging
// wrappedSub := subutils.NewNamedLoggingSubscriber(mySubscriber, logger, zap.DebugLevel, "Debug")

πŸ—οΈ Architecture

Design Principles
  • Interface-based - Easy to mock and extend
  • Dependency inversion - Core doesn't depend on observability
  • Zero-cost abstractions - No overhead when features not used
  • Thread-safe - Safe for concurrent use
  • Graceful degradation - Continues working on errors
Performance Characteristics
  • ~110ns per publish operation (no observability, 0 allocations)
  • ~529ns per publish operation (with OpenTelemetry observability, 12 allocations)
  • ~180ns per publish operation (with standalone metrics, 1 allocation)
  • ~706ns per PublishSync operation (waits for completion, 3 allocations)
  • 4.6+ million messages/second throughput capability
  • Zero allocations in async publish hot path (no observability)
  • Optimized hot path with minimal overhead
  • Configurable buffering (default: 1000 messages) with message dropping for backpressure

πŸ”— Dependencies

Core (Required)
  • go.uber.org/zap - Structured logging
  • github.com/amir-yaghoubi/mqttpattern - Topic pattern matching
Optional (Only when used)
  • go.opentelemetry.io/otel - OpenTelemetry integration

βš™οΈ Configuration

Buffer Size Configuration

Control the internal channel buffer size to handle burst traffic:

// Custom buffer size for high-throughput scenarios
eventBus, err := vinculum.NewEventBus().
    WithLogger(logger).
    WithBufferSize(5000). // Default is 1000
    Build()
if err != nil {
    return err
}

Buffer Size Guidelines:

  • Default (1000): Good for most applications (~0.2ms burst capacity)
  • Small (100-500): Memory-constrained environments
  • Large (2000+): High-throughput, burst-heavy workloads
  • Very Large (10000+): Extreme burst scenarios (consider batching instead)

πŸ“– Examples

Basic Pub/Sub
subscriber := &MySubscriber{}
eventBus.Subscribe(ctx, subscriber, "notifications/+type")
eventBus.Publish(ctx, "notifications/email", emailData)
Error Handling
err := eventBus.PublishSync(ctx, "critical/operation", data)
if err != nil {
    log.Error("Critical operation failed", zap.Error(err))
}
Graceful Shutdown
// Start EventBus
eventBus.Start()

// Handle shutdown signal
c := make(chan os.Signal, 1)
signal.Notify(c, os.Interrupt, syscall.SIGTERM)
<-c

// Graceful shutdown
eventBus.Stop() // Waits for in-flight messages
πŸ“‹ Protocol

Both components implement the Vinculum WebSocket Protocol:

  • JSON-based with compact message format
  • MQTT-style topic patterns
  • Request/response correlation
  • Error handling and acknowledgments

πŸ“– Protocol Specification

🎯 Use Cases

  • Microservice communication within a process
  • Event-driven architectures
  • Decoupled component communication
  • Real-time data processing pipelines
  • Plugin systems with event coordination
  • Application telemetry and monitoring
  • Real-time web applications with WebSocket integration

πŸ“„ License

MIT License - see LICENSE file for details.


Vinculum (Latin: "bond" or "link") - connecting your application components with reliable, observable messaging.

Directories ΒΆ

Path Synopsis
pkg
vinculum/otel
Package otel provides OpenTelemetry implementations for vinculum observability interfaces.
Package otel provides OpenTelemetry implementations for vinculum observability interfaces.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL