server

package
v0.12.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Apr 24, 2026 License: BSD-2-Clause Imports: 15 Imported by: 1

README

Vinculum WebSocket Server

The Vinculum WebSocket Server provides real-time bidirectional communication between web clients and the Vinculum EventBus. It enables web applications to subscribe to events, publish events, and receive real-time updates through WebSocket connections.

Features

  • Real-time Event Streaming - Subscribe to EventBus topics and receive events in real-time
  • Bidirectional Communication - Clients can both subscribe to and publish events
  • Flexible Authentication - Configurable event authorization policies
  • Subscription Control - Fine-grained control over client subscriptions
  • Message Transformation - Transform messages before sending to clients
  • Metrics Integration - Built-in metrics for monitoring connection health and performance
  • Connection Management - Automatic connection lifecycle management with graceful shutdown
  • Protocol Compliance - Implements the Vinculum WebSocket Protocol

Quick Start

Basic Setup
package main

import (
    "log"
    "net/http"
    
    "github.com/tsarna/vinculum/pkg/vinculum"
    "github.com/tsarna/vinculum/pkg/vinculum/vws/server"
    "go.uber.org/zap"
)

func main() {
    // Create logger
    logger, _ := zap.NewDevelopment()
    
    // Create EventBus
    eventBus, err := vinculum.NewEventBus().
        WithLogger(logger).
        Build()
    if err != nil {
        log.Fatal(err)
    }
    
    // Start EventBus
    if err := eventBus.Start(); err != nil {
        log.Fatal(err)
    }
    defer eventBus.Stop()
    
    // Create WebSocket listener
    listener, err := server.NewListener().
        WithEventBus(eventBus).
        WithLogger(logger).
        Build()
    if err != nil {
        log.Fatal(err)
    }
    
    // Set up HTTP server
    http.HandleFunc("/ws", listener.ServeWebsocket)
    
    log.Println("WebSocket server starting on :8080")
    log.Fatal(http.ListenAndServe(":8080", nil))
}
Advanced Configuration
// Create listener with custom configuration
listener, err := server.NewListener().
    WithEventBus(eventBus).
    WithLogger(logger).
    WithMetricsProvider(metricsProvider).
    WithQueueSize(512).                                    // Custom message queue size
    WithPingInterval(30 * time.Second).                    // WebSocket ping interval
    WithWriteTimeout(10 * time.Second).                    // Write timeout
    WithEventAuth(server.AllowTopicPrefix("client/")).     // Event authorization
    WithSubscriptionController(myControllerFactory).       // Custom subscription control
    WithInitialSubscriptions("system/alerts", "status").   // Auto-subscribe new connections
    WithOutboundTransforms(filterSensitive, addTimestamp).  // Outbound message transformations
    WithInboundTransforms(validateInput, addSource).        // Inbound message transformations
    Build()

Configuration Options

Connection Settings
Method Description Default
WithQueueSize(size) Sets the message queue size per connection 256
WithPingInterval(duration) Sets WebSocket ping interval 30s
WithWriteTimeout(duration) Sets write timeout for messages 10s
Security & Authorization
Event Authorization

Control which events clients can publish:

// Allow all events (not recommended for production)
.WithEventAuth(server.AllowAllEvents)

// Deny all events (default - secure)
.WithEventAuth(server.DenyAllEvents)

// Allow events only to specific topic prefixes
.WithEventAuth(server.AllowTopicPrefix("client/"))

// Custom authorization logic
.WithEventAuth(func(ctx context.Context, msg *vws.WireMessage) (*vws.WireMessage, error) {
    // Custom validation logic
    if !isAuthorized(ctx, msg.Topic) {
        return nil, fmt.Errorf("unauthorized topic: %s", msg.Topic)
    }
    return msg, nil
})
Subscription Control

Control which topics clients can subscribe to:

// Allow all subscriptions (default)
.WithSubscriptionController(server.NewPassthroughSubscriptionController)

// Custom subscription control
.WithSubscriptionController(func(logger *zap.Logger) server.SubscriptionController {
    return &MyCustomController{logger: logger}
})
Message Processing
Initial Subscriptions

Automatically subscribe new connections to specific topics:

.WithInitialSubscriptions("system/alerts", "server/status", "user/+/notifications")
Message Transformations

Transform messages before sending to clients:

// Example: Add timestamp to all messages
func addTimestamp(msg *vws.WireMessage) *vws.WireMessage {
    if msg.Data == nil {
        msg.Data = make(map[string]interface{})
    }
    msg.Data["timestamp"] = time.Now().Unix()
    return msg
}

// Example: Filter sensitive data
func filterSensitive(msg *vws.WireMessage) *vws.WireMessage {
    if data, ok := msg.Data["password"]; ok {
        delete(msg.Data, "password")
    }
    return msg
}

.WithOutboundTransforms(addTimestamp, filterSensitive)
.WithInboundTransforms(validatePayload, addClientInfo)

Monitoring & Metrics

Built-in Metrics

When a MetricsProvider is configured, the server automatically tracks:

  • Connection Metrics

    • websocket_connections_total - Total active connections
    • websocket_connections_created_total - Connections created (counter)
    • websocket_connections_closed_total - Connections closed (counter)
  • Message Metrics

    • websocket_messages_sent_total - Messages sent to clients (counter)
    • websocket_messages_received_total - Messages received from clients (counter)
    • websocket_message_send_duration - Time to send messages (histogram)
  • Error Metrics

    • websocket_errors_total - WebSocket errors (counter)
Example with Metrics
// Create standalone metrics provider
metricsProvider := o11y.NewStandaloneMetricsProvider(nil, &o11y.StandaloneMetricsConfig{
    Interval:     30 * time.Second,
    MetricsTopic: "$metrics",
    ServiceName:  "websocket-server",
})

// Create listener with metrics
listener, err := server.NewListener().
    WithEventBus(eventBus).
    WithLogger(logger).
    WithMetricsProvider(metricsProvider).
    Build()

// Start metrics provider
metricsProvider.SetEventBus(eventBus)
metricsProvider.Start()

Protocol

The WebSocket server implements the Vinculum WebSocket Protocol, a JSON-based protocol.

Error Handling

The server provides detailed error responses for various scenarios:

  • Invalid JSON - Malformed message format
  • Missing Required Fields - Missing k (kind) or other required fields
  • Authorization Errors - Event publishing denied by authorization policy
  • Subscription Errors - Subscription denied by subscription controller
  • Connection Errors - WebSocket protocol violations

Best Practices

Security
  1. Use Event Authorization - Always configure appropriate event authorization policies
  2. Validate Topic Patterns - Implement subscription controllers to validate topic access
  3. Rate Limiting - Consider implementing rate limiting for event publishing
  4. Authentication - Implement authentication at the HTTP layer before WebSocket upgrade
Performance
  1. Queue Size - Adjust queue size based on expected message volume
  2. Timeouts - Configure appropriate timeouts for your use case
  3. Message Transforms - Keep transformations lightweight to avoid blocking
  4. Metrics - Monitor connection and message metrics to identify bottlenecks
Reliability
  1. Graceful Shutdown - The server handles graceful shutdown automatically
  2. Connection Monitoring - Use ping intervals to detect dead connections
  3. Error Handling - Implement proper error handling in client applications
  4. Reconnection Logic - Implement client-side reconnection with exponential backoff

Examples

See the example_metrics_test.go file for complete working examples including:

  • Basic WebSocket server setup
  • Metrics integration
  • Custom event authorization
  • Message transformations
  • Production-ready configuration

API Reference

Types
  • Listener - Main WebSocket server that handles connections
  • ListenerConfig - Builder for configuring the listener
  • EventAuthFunc - Function type for event authorization
  • SubscriptionController - Interface for controlling subscriptions
  • SubscriptionControllerFactory - Factory function for creating controllers
Key Methods
  • NewListener() - Creates a new listener configuration builder
  • Build() - Builds the configured listener
  • ServeWebsocket(w, r) - HTTP handler for WebSocket upgrades
  • Shutdown() - Gracefully shuts down the listener

For detailed API documentation, see the Go package documentation.

Documentation

Index

Examples

Constants

View Source
const (
	// DefaultQueueSize is the default size for the WebSocket message queue.
	// This provides a good balance between memory usage and burst handling capacity.
	DefaultQueueSize = 256

	// DefaultPingInterval is the default interval for sending WebSocket ping frames.
	// This helps detect dead connections and maintain connection health.
	DefaultPingInterval = 30 * time.Second

	// DefaultReadTimeout is the default timeout for reading messages from clients.
	// Should be longer than ping interval to allow for pong responses.
	DefaultReadTimeout = 60 * time.Second

	// DefaultWriteTimeout is the default timeout for writing messages to clients.
	// Should be short enough to detect slow/dead clients quickly.
	DefaultWriteTimeout = 10 * time.Second
)

Variables

This section is empty.

Functions

func AllowAllEvents

func AllowAllEvents(ctx context.Context, msg *vws.WireMessage) (*vws.WireMessage, error)

AllowAllEvents is an EventAuthFunc that allows all events without modification.

func DenyAllEvents

func DenyAllEvents(ctx context.Context, msg *vws.WireMessage) (*vws.WireMessage, error)

DenyAllEvents is an EventAuthFunc that denies all events with a generic error. This is the default policy for security.

func DropAllEvents

func DropAllEvents(ctx context.Context, msg *vws.WireMessage) (*vws.WireMessage, error)

DropAllEvents is an EventAuthFunc that silently drops all events. The client receives an ACK response but the event is not published to the EventBus. This is useful for temporarily disabling event publishing without breaking clients.

Types

type Connection

type Connection struct {
	// contains filtered or unexported fields
}

Connection represents an individual WebSocket connection that integrates with the EventBus. It implements the Subscriber interface to receive events from the EventBus and forwards them to the WebSocket client. It also handles incoming messages from the WebSocket client and can publish them to the EventBus.

func (*Connection) OnEvent

func (c *Connection) OnEvent(ctx context.Context, topic string, message any, fields map[string]string) error

OnEvent is called when an event is published to a topic this connection is subscribed to. This method forwards the event to the WebSocket client. Note: This method is called directly by the AsyncQueueingSubscriber wrapper, so it sends messages directly without additional queuing.

func (*Connection) OnSubscribe

func (c *Connection) OnSubscribe(ctx context.Context, topic string) error

func (*Connection) OnUnsubscribe

func (c *Connection) OnUnsubscribe(ctx context.Context, topic string) error

func (*Connection) PassThrough

func (c *Connection) PassThrough(msg bus.EventBusMessage) error

PassThrough handles messages that should be processed directly without going through the normal event processing pipeline. This includes: - MessageTypeTick: Periodic ping messages for connection health monitoring - Other message types: Direct WebSocket message sending (e.g., responses)

func (*Connection) Start

func (c *Connection) Start()

Start begins handling the WebSocket connection. This method automatically subscribes to any configured initial subscriptions and handles both incoming and outbound messages for the WebSocket client.

This method blocks until the connection is closed, running the message reader directly in the calling goroutine for efficiency.

type EventAuthFunc

type EventAuthFunc func(ctx context.Context, msg *vws.WireMessage) (*vws.WireMessage, error)

An EventAuthFunc is called to authorize and potentially modify client-published events. It can:

  • Return (nil, nil) to silently drop the event (no error response sent)
  • Return (nil, error) to deny the event with an error response
  • Return (originalMsg, nil) to allow the event unchanged
  • Return (modifiedMsg, nil) to allow the event with modifications

func AllowTopicPattern

func AllowTopicPattern(pattern string) EventAuthFunc

AllowTopicPattern returns an EventAuthFunc that only allows events to topics that match the specified MQTT-style pattern. This provides more flexible topic filtering than simple prefix matching.

Pattern examples:

  • "sensor/+/data" - allows sensor/temperature/data, sensor/humidity/data, etc.
  • "events/+category/#" - allows events/user/login, events/system/alerts/critical, etc.
  • "client/+userId/actions/+" - allows client/john/actions/login, client/jane/actions/logout, etc.

func AllowTopicPrefix

func AllowTopicPrefix(prefix string) EventAuthFunc

AllowTopicPrefix returns an EventAuthFunc that only allows events to topics with the specified prefix. This is useful for sandboxing client events. You probably want the prefix to end with a slash.

func ChainEventAuth

func ChainEventAuth(funcs ...EventAuthFunc) EventAuthFunc

ChainEventAuth returns an EventAuthFunc that applies multiple EventAuthFunc functions in sequence. The first function that returns an error or drops the message (nil, nil) will stop the chain. If all functions allow the message, the final modified message is returned.

This allows combining multiple authorization policies:

// Allow only client events, but drop test clients
chainedAuth := ChainEventAuth(
    AllowTopicPrefix("client/"),
    func(ctx context.Context, msg *vws.WireMessage) (*vws.WireMessage, error) {
        if strings.Contains(msg.Topic, "/test/") {
            return nil, nil // Silently drop test events
        }
        return msg, nil
    },
)

type Listener

type Listener struct {
	// contains filtered or unexported fields
}

Listener handles incoming WebSocket connections and integrates them with the EventBus. It manages the lifecycle of WebSocket connections, authentication, and message routing between WebSocket clients and the EventBus.

Example (MetricsDisabled)

ExampleListener_metricsDisabled shows how to run the WebSocket server without metrics.

package main

import (
	"fmt"
	"log"
	"net/http"

	bus "github.com/tsarna/vinculum-bus"
	"github.com/tsarna/vinculum-vws/server"
	"go.uber.org/zap"
)

func main() {
	logger, _ := zap.NewDevelopment()
	defer logger.Sync()

	eventBus, err := bus.NewEventBus().
		WithLogger(logger).
		WithBufferSize(1000).
		Build()
	if err != nil {
		log.Fatalf("Build() returned error: %v", err)
	}
	defer eventBus.Stop()

	if err := eventBus.Start(); err != nil {
		log.Fatal(err)
	}

	// Create WebSocket listener without meter provider
	listener, err := server.NewListener().
		WithEventBus(eventBus).
		WithLogger(logger).
		// No WithMeterProvider() call - metrics will be disabled
		Build()
	if err != nil {
		log.Fatal(err)
	}

	http.Handle("/ws", listener)

	fmt.Println("WebSocket server running without metrics on :8080")

	srv := &http.Server{
		Addr:    ":8080",
		Handler: http.DefaultServeMux,
	}

	log.Fatal(srv.ListenAndServe())
}
Example (MetricsWithNoopProvider)

ExampleListener_metricsWithNoopProvider shows how to use a noop meter provider for testing.

package main

import (
	"fmt"
	"log"
	"net/http"

	bus "github.com/tsarna/vinculum-bus"
	"github.com/tsarna/vinculum-vws/server"
	"go.opentelemetry.io/otel/metric/noop"
	"go.uber.org/zap"
)

func main() {

	// Create logger and EventBus
	logger, _ := zap.NewDevelopment()
	defer logger.Sync()

	eventBus, err := bus.NewEventBus().
		WithLogger(logger).
		WithBufferSize(1000).
		Build()
	if err != nil {
		log.Fatalf("Build() returned error: %v", err)
	}
	defer eventBus.Stop()

	if err := eventBus.Start(); err != nil {
		log.Fatal(err)
	}

	// Use noop meter provider (useful for testing)
	mp := noop.NewMeterProvider()

	// Create WebSocket listener
	listener, err := server.NewListener().
		WithEventBus(eventBus).
		WithLogger(logger).
		WithMeterProvider(mp).
		Build()
	if err != nil {
		log.Fatal(err)
	}

	// Set up HTTP server
	http.Handle("/ws", listener)

	fmt.Println("WebSocket server with noop metrics provider running on :8080")

	// Start server
	srv := &http.Server{
		Addr:    ":8080",
		Handler: http.DefaultServeMux,
	}

	log.Fatal(srv.ListenAndServe())
}

func (*Listener) ConnectionCount

func (l *Listener) ConnectionCount() int

ConnectionCount returns the current number of active WebSocket connections. This is useful for monitoring and health checks.

func (*Listener) ServeHTTP

func (l *Listener) ServeHTTP(w http.ResponseWriter, r *http.Request)

ServeHTTP handles incoming HTTP requests and upgrades them to WebSocket connections. This method can be plugged directly into HTTP routers (e.g., chi, gorilla/mux, net/http).

The method will:

  • Upgrade the HTTP connection to WebSocket
  • Handle the WebSocket connection lifecycle
  • Integrate the connection with the EventBus for pub/sub functionality

Usage example:

listener := NewListener(eventBus, logger)
http.Handle("/ws", listener.ServeWebsocket)

func (*Listener) Shutdown

func (l *Listener) Shutdown(ctx context.Context) error

Shutdown gracefully closes all active WebSocket connections and stops accepting new ones. This method should be called when the server is shutting down to ensure proper cleanup.

The shutdown process:

  1. Stop accepting new connections (returns StatusServiceRestart)
  2. Close all active connections with StatusGoingAway
  3. Wait for all connections to finish cleanup

This method blocks until all connections are closed or the context is cancelled.

type ListenerConfig

type ListenerConfig struct {
	// contains filtered or unexported fields
}

ListenerConfig holds the configuration for creating a WebSocket Listener. Use NewListener() to create a new configuration and chain methods to set the required parameters before calling Build().

func NewListener

func NewListener() *ListenerConfig

NewListener creates a new ListenerConfig for building a WebSocket Listener. Use the fluent methods to set the required EventBus and Logger, then call Build().

Example:

listener, err := vws.NewListener().
    WithEventBus(eventBus).
    WithLogger(logger).
    WithMeterProvider(meterProvider).
    WithQueueSize(512).
    WithPingInterval(45 * time.Second).
    WithEventAuth(AllowTopicPrefix("client/")).
    WithSubscriptionController(myControllerFactory).
    WithInitialSubscriptions("system/alerts", "server/status").
    WithOutboundTransforms(filterSensitive, addTimestamp).
    Build()

func (*ListenerConfig) Build

func (c *ListenerConfig) Build() (*Listener, error)

Build creates a new WebSocket Listener from the configuration. Returns an error if the configuration is invalid (missing EventBus or Logger).

The returned Listener is ready to accept WebSocket connections and integrate them with the EventBus for real-time message streaming.

func (*ListenerConfig) IsValid

func (c *ListenerConfig) IsValid() error

IsValid checks if the configuration has all required parameters set. Returns nil if the configuration is valid, or an error describing what's missing.

func (*ListenerConfig) WithEventAuth

func (c *ListenerConfig) WithEventAuth(authFunc EventAuthFunc) *ListenerConfig

WithEventAuth sets the event authorization function for client-published events. This function determines whether clients are allowed to publish events to the EventBus and can modify events before publishing.

The function receives the original message and returns:

  • (*WireMessage, nil): Use the returned modified message
  • (nil, nil): Use the original message unchanged
  • (nil, error): Deny the event with the given error

Predefined options:

  • DenyAllEvents: Blocks all client events (secure default)
  • AllowAllEvents: Allows all client events (development/trusted environments)
  • AllowTopicPrefix("prefix/"): Only allows events to topics with specific prefix

Default: DenyAllEvents (secure)

func (*ListenerConfig) WithEventBus

func (c *ListenerConfig) WithEventBus(eventBus bus.EventBus) *ListenerConfig

WithEventBus sets the EventBus for the WebSocket Listener. The EventBus is required for integrating WebSocket connections with the pub/sub system.

func (*ListenerConfig) WithInboundTransforms

func (c *ListenerConfig) WithInboundTransforms(transforms ...transform.MessageTransformFunc) *ListenerConfig

WithInboundTransforms sets the message transformation functions that will be applied to inbound messages from WebSocket clients before publishing to the EventBus. These transforms use the transform.MessageTransformFunc type and operate on EventBusMessage created from the WebSocket message.

Transform functions are called in the order they are provided and can:

  • Modify message content (topic, payload, fields)
  • Drop messages (return nil message)
  • Stop the transform pipeline (return false)

Example:

transforms := []transform.MessageTransformFunc{
    transform.FilterByTopicPrefix("allowed/"),
    transform.AddField("source", "websocket"),
    transform.ValidatePayload(),
}
config.WithInboundTransforms(transforms...)

Default: No transforms (messages published as-is after authorization)

func (*ListenerConfig) WithInitialSubscriptions

func (c *ListenerConfig) WithInitialSubscriptions(topics ...string) *ListenerConfig

WithInitialSubscriptions sets the topic patterns that new WebSocket connections should be automatically subscribed to when they connect. These subscriptions happen automatically without client request and bypass the subscription controller.

This is useful for:

  • Pushing server-side events to all clients
  • Providing default subscriptions for convenience
  • Broadcasting system notifications

Example:

config.WithInitialSubscriptions("system/alerts", "server/status")

Default: No initial subscriptions

func (*ListenerConfig) WithLogger

func (c *ListenerConfig) WithLogger(logger *zap.Logger) *ListenerConfig

WithLogger sets the Logger for the WebSocket Listener. The Logger is required for connection events, errors, and debugging.

func (*ListenerConfig) WithMeterProvider added in v0.11.0

func (c *ListenerConfig) WithMeterProvider(provider metric.MeterProvider) *ListenerConfig

WithMeterProvider sets the OTel MeterProvider for the WebSocket Listener. The MeterProvider is optional and enables collection of WebSocket server metrics such as connection counts, message rates, error rates, and connection durations.

If not provided, no metrics will be collected.

func (*ListenerConfig) WithOutboundTransforms

func (c *ListenerConfig) WithOutboundTransforms(transforms ...transform.MessageTransformFunc) *ListenerConfig

WithOutboundTransforms sets the message transformation functions that will be applied to outbound messages from the EventBus before sending to WebSocket clients. These transforms use the new transform.MessageTransformFunc type and operate on EventBusMessage rather than WebSocketMessage.

Transform functions are called in the order they are provided and work with the subscriber wrapper pattern for better separation of concerns.

Example:

transforms := []transform.MessageTransformFunc{
    transform.DropByTopicPattern("internal/*"),
    transform.AddTimestamp(),
    transform.FilterByTopicPrefix("public/"),
}
config.WithOutboundTransforms(transforms...)

Default: No transforms (messages sent as-is)

func (*ListenerConfig) WithPingInterval

func (c *ListenerConfig) WithPingInterval(interval time.Duration) *ListenerConfig

WithPingInterval sets the interval for sending WebSocket ping frames. This helps detect dead connections and maintain connection health. Must be positive. Set to 0 to disable ping/pong health monitoring.

Default: 30 seconds

func (*ListenerConfig) WithQueueSize

func (c *ListenerConfig) WithQueueSize(size int) *ListenerConfig

WithQueueSize sets the message queue size for WebSocket connections. This controls how many messages can be buffered per connection before messages start getting dropped. Larger values handle bursts better but use more memory. Must be positive.

Default: 256 messages per connection

func (*ListenerConfig) WithServerName added in v0.11.1

func (c *ListenerConfig) WithServerName(name string) *ListenerConfig

WithServerName sets the vinculum server name used in metric attributes.

func (*ListenerConfig) WithSubscriptionController

func (c *ListenerConfig) WithSubscriptionController(factory SubscriptionControllerFactory) *ListenerConfig

WithSubscriptionController sets the subscription controller factory for managing client subscriptions and unsubscriptions. The factory function receives the EventBus and logger and should return a SubscriptionController instance.

The controller can:

  • Allow/deny subscription requests
  • Rewrite subscriptions to different topic patterns
  • Split one subscription into multiple subscriptions
  • Maintain state for complex subscription policies

Default: NewPassthroughSubscriptionController (allows all subscriptions)

func (*ListenerConfig) WithWriteTimeout

func (c *ListenerConfig) WithWriteTimeout(timeout time.Duration) *ListenerConfig

WithWriteTimeout sets the timeout for writing messages to WebSocket clients. This prevents the server from hanging when clients are slow to receive data. Should be short enough to detect slow/dead clients quickly.

Default: 10 seconds

type PassthroughSubscriptionController

type PassthroughSubscriptionController struct {
	// contains filtered or unexported fields
}

PassthroughSubscriptionController is the default SubscriptionController that allows all subscriptions and unsubscriptions without modification by directly calling the EventBus methods.

func (*PassthroughSubscriptionController) Subscribe

func (p *PassthroughSubscriptionController) Subscribe(ctx context.Context, eventBus bus.EventBus, subscriber bus.Subscriber, topicPattern string) error

Subscribe allows the subscription as-is by calling EventBus.Subscribe directly.

func (*PassthroughSubscriptionController) Unsubscribe

func (p *PassthroughSubscriptionController) Unsubscribe(ctx context.Context, eventBus bus.EventBus, subscriber bus.Subscriber, topicPattern string) error

Unsubscribe allows the unsubscription as-is by calling EventBus.Unsubscribe directly.

func (*PassthroughSubscriptionController) UnsubscribeAll

func (p *PassthroughSubscriptionController) UnsubscribeAll(ctx context.Context, eventBus bus.EventBus, subscriber bus.Subscriber) error

UnsubscribeAll allows the unsubscribe all as-is by calling EventBus.UnsubscribeAll directly.

type SubscriptionController

type SubscriptionController interface {
	// Subscribe is called when a client wants to subscribe to a topic pattern.
	// The controller is responsible for making the actual EventBus.Subscribe call(s).
	// It can:
	//   - Call eventBus.Subscribe with the original pattern to allow requests as-is
	//   - Call eventBus.Subscribe with different patterns to modify the subscription
	//   - Call eventBus.Subscribe multiple times to split into multiple subscriptions
	//   - Return an error without calling eventBus.Subscribe to deny the subscription
	Subscribe(ctx context.Context, eventBus bus.EventBus, subscriber bus.Subscriber, topicPattern string) error

	// Unsubscribe is called when a client wants to unsubscribe from a topic pattern.
	// The controller is responsible for making the actual EventBus.Unsubscribe call(s).
	// It can:
	//   - Call eventBus.Unsubscribe with the original pattern to allow as-is
	//   - Call eventBus.Unsubscribe with different patterns to modify the unsubscription
	//   - Call eventBus.Unsubscribe multiple times to handle multiple subscriptions
	//   - Return an error without calling eventBus.Unsubscribe to deny the unsubscription
	Unsubscribe(ctx context.Context, eventBus bus.EventBus, subscriber bus.Subscriber, topicPattern string) error

	// UnsubscribeAll is called when a client wants to unsubscribe from all topics.
	// The controller is responsible for making the actual EventBus.UnsubscribeAll call.
	// It can:
	//   - Call eventBus.UnsubscribeAll to allow as-is
	//   - Call specific eventBus.Unsubscribe calls to handle partial unsubscriptions
	//   - Return an error without calling eventBus methods to deny the unsubscribe all
	UnsubscribeAll(ctx context.Context, eventBus bus.EventBus, subscriber bus.Subscriber) error
}

SubscriptionController controls what topic patterns clients can subscribe to and unsubscribe from. It is responsible for making the actual EventBus calls and can reject subscriptions, rewrite them, or split them into multiple subscriptions.

func NewPassthroughSubscriptionController

func NewPassthroughSubscriptionController(logger *zap.Logger) SubscriptionController

NewPassthroughSubscriptionController creates a PassthroughSubscriptionController. This is the default subscription controller factory.

type SubscriptionControllerFactory

type SubscriptionControllerFactory func(logger *zap.Logger) SubscriptionController

SubscriptionControllerFactory creates a SubscriptionController instance. It receives a logger that the controller can use for logging.

func NewTopicPrefixSubscriptionController

func NewTopicPrefixSubscriptionController(prefix string) SubscriptionControllerFactory

NewTopicPrefixSubscriptionController creates a factory for TopicPrefixSubscriptionController.

type TopicPrefixSubscriptionController

type TopicPrefixSubscriptionController struct {
	*PassthroughSubscriptionController
	// contains filtered or unexported fields
}

TopicPrefixSubscriptionController only allows subscriptions to topics with a specific prefix. This is useful for sandboxing client subscriptions. It embeds PassthroughSubscriptionController to delegate EventBus calls when subscriptions are allowed.

func (*TopicPrefixSubscriptionController) Subscribe

func (t *TopicPrefixSubscriptionController) Subscribe(ctx context.Context, eventBus bus.EventBus, subscriber bus.Subscriber, topicPattern string) error

Subscribe only allows subscriptions to topics with the configured prefix. If allowed, it delegates to the embedded PassthroughSubscriptionController.

func (*TopicPrefixSubscriptionController) Unsubscribe

func (t *TopicPrefixSubscriptionController) Unsubscribe(ctx context.Context, eventBus bus.EventBus, subscriber bus.Subscriber, topicPattern string) error

Unsubscribe only allows unsubscriptions from topics with the configured prefix. If allowed, it delegates to the embedded PassthroughSubscriptionController.

func (*TopicPrefixSubscriptionController) UnsubscribeAll

func (t *TopicPrefixSubscriptionController) UnsubscribeAll(ctx context.Context, eventBus bus.EventBus, subscriber bus.Subscriber) error

UnsubscribeAll allows unsubscribing from all topics - no prefix restrictions apply. It delegates to the embedded PassthroughSubscriptionController.

type WebSocketMetrics

type WebSocketMetrics struct {
	// contains filtered or unexported fields
}

WebSocketMetrics defines the standard metrics collected by the WebSocket server. This struct holds references to all the OTel metric instruments used for monitoring WebSocket server performance and behavior.

Example

ExampleWebSocketMetrics demonstrates how to set up a WebSocket server with metrics collection.

package main

import (
	"context"
	"fmt"
	"log"
	"net/http"
	"time"

	bus "github.com/tsarna/vinculum-bus"
	"github.com/tsarna/vinculum-bus/o11y"
	"github.com/tsarna/vinculum-vws/server"
	"go.uber.org/zap"
)

func main() {
	// Create a logger
	logger, _ := zap.NewDevelopment()
	defer logger.Sync()

	// Create an EventBus (using the default implementation)
	eventBus, err := bus.NewEventBus().
		WithLogger(logger).
		WithBufferSize(1000).
		Build()
	if err != nil {
		log.Fatalf("Build() returned error: %v", err)
	}
	defer eventBus.Stop()

	// Start the EventBus
	if err := eventBus.Start(); err != nil {
		log.Fatal(err)
	}

	// Create a standalone meter provider that publishes metrics to the bus
	mp, _ := o11y.NewStandaloneMeterProvider(eventBus, &o11y.StandaloneMetricsConfig{
		Interval:     30 * time.Second,
		MetricsTopic: "$metrics",
		ServiceName:  "websocket-server",
	})
	defer mp.Shutdown(context.Background()) //nolint:errcheck

	// Create the WebSocket listener with metrics
	listener, err := server.NewListener().
		WithEventBus(eventBus).
		WithLogger(logger).
		WithMeterProvider(mp).
		WithQueueSize(512).
		WithPingInterval(30 * time.Second).
		WithEventAuth(server.AllowTopicPrefix("client/")).
		Build()
	if err != nil {
		log.Fatal(err)
	}

	// Set up HTTP server
	http.Handle("/ws", listener)

	// Start server
	srv := &http.Server{
		Addr:    ":8080",
		Handler: http.DefaultServeMux,
	}

	fmt.Println("WebSocket server with metrics running on :8080")
	fmt.Println("Metrics will be published to topic '$metrics' every 30 seconds")
	fmt.Println("Connect to ws://localhost:8080/ws to test")

	// In a real application, you would handle graceful shutdown
	log.Fatal(srv.ListenAndServe())
}

func NewWebSocketMetrics

func NewWebSocketMetrics(serverName string, meter metric.Meter) *WebSocketMetrics

NewWebSocketMetrics creates a new WebSocketMetrics instance using the provided Meter. If the meter is nil, returns nil (no metrics will be collected).

func (*WebSocketMetrics) RecordConnectionActive

func (m *WebSocketMetrics) RecordConnectionActive(ctx context.Context, count int)

RecordConnectionActive updates the active connection count.

func (*WebSocketMetrics) RecordConnectionEnd

func (m *WebSocketMetrics) RecordConnectionEnd(ctx context.Context, duration time.Duration)

RecordConnectionEnd records when a WebSocket connection ends and its duration.

func (*WebSocketMetrics) RecordConnectionError

func (m *WebSocketMetrics) RecordConnectionError(ctx context.Context, errorType string)

RecordConnectionError records connection-level errors (upgrade failures, etc.).

func (*WebSocketMetrics) RecordConnectionStart

func (m *WebSocketMetrics) RecordConnectionStart(ctx context.Context)

RecordConnectionStart records when a new WebSocket connection is established.

func (*WebSocketMetrics) RecordMessageError

func (m *WebSocketMetrics) RecordMessageError(ctx context.Context, errorType string, messageKind string)

RecordMessageError records message processing errors.

func (*WebSocketMetrics) RecordMessageReceived

func (m *WebSocketMetrics) RecordMessageReceived(ctx context.Context, sizeBytes int, messageKind string)

RecordMessageReceived records when a message is received from a client.

func (*WebSocketMetrics) RecordMessageSent

func (m *WebSocketMetrics) RecordMessageSent(ctx context.Context, sizeBytes int, messageType string)

RecordMessageSent records when a message is sent to a client.

func (*WebSocketMetrics) RecordPingSent

func (m *WebSocketMetrics) RecordPingSent(ctx context.Context)

RecordPingSent records when a ping frame is sent to a client.

func (*WebSocketMetrics) RecordPongTimeout

func (m *WebSocketMetrics) RecordPongTimeout(ctx context.Context)

RecordPongTimeout records when a client fails to respond to a ping (dead connection).

func (*WebSocketMetrics) RecordRequest

func (m *WebSocketMetrics) RecordRequest(ctx context.Context, requestKind string) func(error)

RecordRequest records the start of a request and returns a function to record completion. Usage:

recordCompletion := metrics.RecordRequest(ctx, "subscribe")
defer recordCompletion(err)

func (*WebSocketMetrics) RecordWriteTimeout

func (m *WebSocketMetrics) RecordWriteTimeout(ctx context.Context)

RecordWriteTimeout records when a write operation times out.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL