service

package
v1.0.0-alpha.12 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Mar 6, 2026 License: MIT Imports: 44 Imported by: 0

README

Service Package

Framework-level service infrastructure for SemStreams, providing service lifecycle management, HTTP server coordination, and configuration management.

Overview

The service package defines the core service architecture for SemStreams, providing explicit service registration with standardized lifecycle management, dependency injection, and HTTP endpoint coordination. This package follows clean architecture principles with dependency injection through Dependencies and configuration-driven service instantiation.

Services in SemStreams are self-contained units that are explicitly registered via the RegisterAll() function, receive structured dependencies, and can optionally expose HTTP endpoints through a shared server. The Manager coordinates all service lifecycle operations while maintaining clean separation of concerns.

The package supports both mandatory services (always running) and optional services (config-driven), with built-in health monitoring, graceful shutdown, and OpenAPI documentation aggregation.

Installation

import "github.com/c360/semstreams/service"

Core Concepts

Service Interface

Every service must implement the Service interface, providing lifecycle methods (Start/Stop) and health monitoring. Services handle their own configuration parsing and business logic.

Explicit Registration Pattern

Services export Register() functions that are called by RegisterAll() in register.go, enabling clear dependency graphs and testable service registration without global state modification.

Dependencies

All external dependencies (NATS client, metrics registry, logger, platform identity, config manager) are injected through Dependencies struct, following clean dependency injection patterns.

Manager

Central coordinator that manages service lifecycle, owns the shared HTTP server, and aggregates OpenAPI documentation from all services. Acts as both a framework component and a service itself.

ComponentManager Service

Special service that manages component lifecycle (inputs, processors, outputs, storage). Provides HTTP APIs for component health, status, and configuration management. See component package for component architecture and flowgraph for connectivity validation.

Usage

Basic Example
// Exported Register function for explicit registration
func Register(registry *service.Registry) error {
    return registry.Register("my-service", NewMyService)
}

// Constructor following service pattern
func NewMyService(rawConfig json.RawMessage, deps *Dependencies) (Service, error) {
    cfg := &MyServiceConfig{
        Enabled: true,  // default values
        Port:    8080,
    }
    
    // Parse raw JSON configuration
    if len(rawConfig) > 0 {
        if err := json.Unmarshal(rawConfig, cfg); err != nil {
            return nil, fmt.Errorf("invalid my-service config: %w", err)
        }
    }
    
    return &MyService{
        config: cfg,
        nats:   deps.NATSClient,
        logger: deps.Logger,
        platform: deps.Platform,
    }, nil
}

// Service implementation
type MyService struct {
    config *MyServiceConfig
    nats   *natsclient.Client
    logger *slog.Logger
    platform types.PlatformMeta
}

func (s *MyService) Start(ctx context.Context) error {
    s.logger.Info("Starting my-service", "org", s.platform.Org, "platform", s.platform.Platform)
    // Service-specific startup logic
    return nil
}

func (s *MyService) Stop(timeout time.Duration) error {
    s.logger.Info("Stopping my-service")
    // Graceful shutdown logic
    return nil
}

func (s *MyService) IsHealthy() bool {
    return true // Service-specific health check
}

func (s *MyService) GetStatus() ServiceStatus {
    return ServiceStatus{
        Name:    "my-service",
        Healthy: s.IsHealthy(),
        Started: time.Now(), // Track actual start time
    }
}
Advanced Usage
// Service with HTTP endpoints
type MyService struct {
    // ... fields
}

// Implement HTTPHandler interface for HTTP endpoints
func (s *MyService) RegisterHTTPHandlers(prefix string, mux *http.ServeMux) {
    mux.HandleFunc(prefix+"/status", s.handleStatus)
    mux.HandleFunc(prefix+"/data", s.handleData)
}

func (s *MyService) OpenAPISpec() *OpenAPISpec {
    return &OpenAPISpec{
        Paths: map[string]PathItem{
            "/status": {
                Get: &Operation{
                    Summary:     "Get service status",
                    Description: "Returns current service status",
                    Responses: map[string]Response{
                        "200": {Description: "Service status"},
                    },
                },
            },
        },
    }
}

// Service with runtime configuration support
func (s *MyService) GetRuntimeConfig() map[string]any {
    return map[string]any{
        "enabled": s.config.Enabled,
        "port":    s.config.Port,
    }
}

func (s *MyService) ValidateConfigUpdate(newConfig map[string]any) error {
    // Validate proposed configuration changes
    if port, ok := newConfig["port"].(float64); ok {
        if port != float64(s.config.Port) {
            return fmt.Errorf("port changes require service restart")
        }
    }
    return nil
}

func (s *MyService) ApplyConfigUpdate(newConfig map[string]any) error {
    // Apply runtime configuration changes
    if enabled, ok := newConfig["enabled"].(bool); ok {
        s.config.Enabled = enabled
        s.logger.Info("Updated enabled setting", "enabled", enabled)
    }
    return nil
}
ComponentManager HTTP APIs

ComponentManager service exposes HTTP endpoints for component management:

// GET /api/v1/components - List all managed components
// GET /api/v1/components/{name} - Get specific component details
// GET /api/v1/components/{name}/health - Component health status
// POST /api/v1/components/{name}/start - Start a component
// POST /api/v1/components/{name}/stop - Stop a component

// Connectivity validation endpoints (uses flowgraph internally)
// GET /api/v1/flowgraph - Component connectivity graph
// GET /api/v1/validate/connectivity - Connectivity analysis

For component architecture and connectivity validation details, see component package and flowgraph.

FlowService Runtime Endpoints

FlowService exposes runtime observability endpoints for debugging and monitoring running flows:

// GET /flowbuilder/flows/{id}/runtime/metrics - Component metrics (JSON polling)
// Returns throughput, error rates, queue depth per component
// Poll interval: 2-5s recommended
// Response time: <100ms (90ms timeout)

// GET /flowbuilder/flows/{id}/runtime/health - Component health (JSON polling)
// Returns component status, uptime, last activity
// Poll interval: 5s recommended
// Response time: <200ms (180ms timeout)

// GET /flowbuilder/flows/{id}/runtime/messages - NATS message flow (JSON polling)
// Returns filtered message logger entries for flow components
// Poll interval: 1-2s recommended
// Response time: <100ms (90ms timeout)

// GET /flowbuilder/flows/{id}/runtime/logs - Component logs (SSE streaming)
// Streams real-time logs from all components in the flow
// Connection: Server-Sent Events (text/event-stream)

Runtime Metrics Response (GET /runtime/metrics):

{
  "timestamp": "2025-11-17T14:23:05.123456789Z",
  "components": [
    {
      "name": "udp-source",
      "throughput": 1234.5,
      "error_rate": 0.0,
      "queue_depth": 0,
      "status": "healthy"
    }
  ],
  "prometheus_available": true
}

Runtime Health Response (GET /runtime/health):

{
  "timestamp": "2025-11-17T14:23:05.123456789Z",
  "overall": {
    "status": "healthy",
    "running_count": 3,
    "degraded_count": 0,
    "error_count": 0
  },
  "components": [
    {
      "name": "udp-source",
      "type": "udp",
      "status": "running",
      "healthy": true,
      "message": "Processing messages",
      "start_time": "2025-11-17T14:07:33.123Z",
      "last_activity": "2025-11-17T14:23:04.567Z",
      "uptime_seconds": 932,
      "details": null
    }
  ]
}

Runtime Messages Response (GET /runtime/messages?limit=100):

{
  "timestamp": "2025-11-17T14:23:01.123456789Z",
  "messages": [
    {
      "timestamp": "2025-11-17T14:23:01.234567890Z",
      "subject": "process.json-processor.data",
      "message_id": "msg-12345",
      "component": "json-processor",
      "direction": "published",
      "summary": "JSON filter applied",
      "metadata": {"size_bytes": 256},
      "message_type": "ProcessedData"
    }
  ],
  "total": 1,
  "limit": 100
}

Runtime Logs Stream (GET /runtime/logs - SSE):

event: log
data: {"timestamp":"2025-11-17T14:23:01.123Z","level":"INFO","component":"udp-source","message":"Listening on :5000"}

event: log
data: {"timestamp":"2025-11-17T14:23:02.456Z","level":"ERROR","component":"processor","message":"Failed to parse JSON"}

event: ping
data: {"timestamp":"2025-11-17T14:23:05.000Z"}

Architecture:

  • Metrics: Three-tier fallback (Prometheus API → raw metrics → health only)
  • Health: Uses ComponentManager health status with timing enhancements
  • Messages: Filters MessageLogger circular buffer by flow component subjects
  • Logs: Aggregates component logs and streams via SSE with reconnection support

Performance:

  • All endpoints enforce strict timeouts (<100ms or <200ms)
  • Graceful degradation when optional services unavailable
  • UTC timestamps for consistency across distributed systems
  • Pre-allocated buffers and efficient filtering

Security:

  • Input validation on all query parameters
  • DoS protection via limit enforcement (max 1000 messages)
  • Subject pattern sanitization prevents injection attacks
  • Error messages safe for client exposure

For implementation details, see:

  • flow_runtime_metrics.go - Prometheus integration with fallback tiers
  • flow_runtime_health.go - Component health aggregation with timing
  • flow_runtime_messages.go - NATS message flow filtering
  • flow_runtime_logs.go - SSE log streaming (if implemented)
WebSocket Status Stream

Real-time flow status updates via WebSocket connection. This endpoint provides unified streaming of flow state changes, component health, metrics, and logs.

// GET /flowbuilder/status/stream?flowId={flowId} - WebSocket status stream
// Connection: WebSocket (ws:// or wss://)
// Real-time streaming of all flow observability data

Connection:

wscat -c "ws://localhost:8080/flowbuilder/status/stream?flowId=my-flow-id"

Message Types (Server → Client):

All messages are wrapped in a StatusStreamEnvelope:

{
    "type": "flow_status",
    "id": "msg-uuid-12345",
    "timestamp": 1705412345000,
    "flow_id": "my-flow-id",
    "payload": { ... }
}
Type Trigger Description
flow_status State change Flow state transitions (deployed, running, stopped, failed)
component_health Every 5s Component health status from ComponentManager
component_metrics As published Real-time metrics from MetricsForwarder
log_entry As logged Application logs via NATS LogForwarder

Flow Status Payload:

{
    "state": "running",
    "prev_state": "deployed_stopped",
    "timestamps": {
        "created": "2025-01-15T10:00:00Z",
        "deployed": "2025-01-15T10:05:00Z",
        "started": "2025-01-15T10:05:30Z"
    },
    "error": null
}

Component Health Payload:

{
    "udp-input": {
        "healthy": true,
        "status": "running",
        "error_count": 0
    },
    "json-processor": {
        "healthy": true,
        "status": "processing",
        "error_count": 2
    }
}

Log Entry Payload:

{
    "level": "INFO",
    "source": "udp-input",
    "message": "Packet received from 192.168.1.1:5000",
    "fields": {
        "bytes": 1024,
        "remote_addr": "192.168.1.1:5000"
    }
}

Component Metrics Payload:

{
    "component": "udp-input",
    "metrics": [
        {
            "name": "packets_received_total",
            "type": "counter",
            "value": 12345,
            "labels": {"status": "success"}
        }
    ]
}

Client Commands (Client → Server):

Clients can filter what messages they receive:

{
    "command": "subscribe",
    "message_types": ["flow_status", "log_entry"],
    "log_level": "WARN",
    "sources": ["udp-input", "json-processor"]
}
Field Description
message_types Filter which message types to receive
log_level Minimum log level: DEBUG, INFO, WARN, ERROR
sources Filter logs/metrics by component names

Architecture:

The WebSocket status stream uses NATS as the backbone for all real-time data:

┌─────────────────────────────────────────────────────────────┐
│                    Application                               │
│  ┌─────────────┐  ┌──────────────┐  ┌─────────────────────┐ │
│  │ slog.Logger │  │MetricsForward│  │ ComponentManager    │ │
│  └──────┬──────┘  └──────┬───────┘  └──────────┬──────────┘ │
└─────────┼────────────────┼───────────────────────┼───────────┘
          │                │                       │
          ▼                ▼                       │
   ┌──────────────────────────────────────┐       │
   │         NATS JetStream               │       │
   │  ┌────────┐  ┌──────────┐            │       │
   │  │ logs.> │  │ metrics.>│            │       │
   │  └────────┘  └──────────┘            │       │
   └──────────────────┬───────────────────┘       │
                      │                           │
                      ▼                           ▼
            ┌─────────────────────────────────────────┐
            │        WebSocket Status Stream          │
            │  ┌────────────┐  ┌────────────────────┐ │
            │  │logStreamer │  │metricsStreamer     │ │
            │  │flowWatcher │  │healthTicker        │ │
            │  └────────────┘  └────────────────────┘ │
            └─────────────────────────────────────────┘
                              │
                              ▼
                    ┌──────────────────┐
                    │  WebSocket Client │
                    │  (Frontend UI)    │
                    └──────────────────┘

Log Architecture:

Logs flow through the system using the logging package:

  1. Application code calls slog.Info(), slog.Error(), etc.
  2. MultiHandler dispatches to both TextHandler (stdout) and NATSLogHandler
  3. NATSLogHandler publishes to NATS logs.{source}.{level} subjects
  4. LOGS JetStream stream stores logs with 1hr TTL and 100MB limit
  5. WebSocket's logStreamer subscribes to logs.> and forwards to clients

This architecture ensures:

  • No timing issues: Logs publish to NATS at handler creation time
  • Out-of-band logs: Always available via NATS even without WebSocket
  • Graceful fallback: NATS failures don't block stdout logging
  • Source filtering: exclude_sources config prevents feedback loops

Configuration:

Log forwarding is configured via log-forwarder service config:

{
    "services": {
        "log-forwarder": {
            "enabled": true,
            "config": {
                "min_level": "INFO",
                "exclude_sources": ["flow-service.websocket"]
            }
        }
    }
}
Field Description
min_level Minimum log level to publish to NATS
exclude_sources Source prefixes to exclude (prevents feedback loops)

Implementation Files:

  • flow_runtime_stream.go - WebSocket handler, client state, worker goroutines
  • flow_runtime_stream_test.go - Unit tests
  • flow_runtime_stream_integration_test.go - Integration tests with real NATS
  • pkg/logging/ - MultiHandler, NATSLogHandler

API Reference

Types
Service

Primary interface that all services must implement.

type Service interface {
    Start(ctx context.Context) error    // Start service with context
    Stop(timeout time.Duration) error   // Stop service with timeout
    IsHealthy() bool                    // Health check
    GetStatus() ServiceStatus           // Service status for monitoring
}
Dependencies

Dependency injection structure for service construction.

type Dependencies struct {
    NATSClient      *natsclient.Client        // Required: NATS messaging client
    MetricsRegistry *metric.MetricsRegistry   // Optional: Prometheus metrics
    Logger          *slog.Logger              // Optional: structured logger (defaults to slog.Default())
    Platform        types.PlatformMeta        // Required: platform identity (org + platform)
    Manager   *config.Manager     // Optional: centralized configuration management
}
Manager

Central service coordinator and HTTP server owner.

type Manager struct {
    // Thread-safe service lifecycle management and HTTP server coordination
}
Functions
RegisterConstructor(name string, constructor Constructor)

Registers a service constructor with the ServiceRegistry. Called by RegisterAll() during service initialization.

(m *Manager) CreateService(name string, rawConfig json.RawMessage, deps *Dependencies) (Service, error)

Creates a service instance using the registered constructor with proper dependency injection.

(m *Manager) StartAll(ctx context.Context) error

Starts all created services in registration order with proper error handling.

(m *Manager) StopAll(timeout time.Duration) error

Stops all services in reverse order with graceful shutdown and timeout handling.

(cm *ComponentManager) ListComponents() []component.Discoverable

Returns all managed components for introspection and monitoring.

(cm *ComponentManager) GetComponent(name string) (component.Discoverable, bool)

Retrieves a specific managed component by name. Returns false if component not found.

Interfaces
HTTPHandler
type HTTPHandler interface {
    RegisterHTTPHandlers(prefix string, mux *http.ServeMux)
    OpenAPISpec() *OpenAPISpec
}

Optional interface for services that want to expose HTTP endpoints. Manager automatically registers handlers and aggregates OpenAPI documentation.

RuntimeConfigurable
type RuntimeConfigurable interface {
    GetRuntimeConfig() map[string]any
    ValidateConfigUpdate(newConfig map[string]any) error
    ApplyConfigUpdate(newConfig map[string]any) error
}

Optional interface for services that support runtime configuration changes without restart.

Architecture

Design Decisions

Explicit Service Registration: Chose RegisterAll() orchestration over init() self-registration

  • Automatic discovery without configuration complexity
  • Clean dependency management through imports
  • Explicit control over service availability

Centralized HTTP Server: Manager owns single HTTP server shared by all services

  • Eliminates port conflicts and resource waste
  • Unified OpenAPI documentation and routing
  • Consistent URL patterns and middleware

Constructor Pattern: Standardized service constructor signature matching dependency injection

  • Services handle their own configuration parsing and validation
  • Enables flexible per-service configuration schemas
  • Clean separation between framework and service logic

Configuration-Driven Instantiation: Services created only if registered AND configured

  • Clear distinction between available (registered) and active (configured)
  • Supports optional services with graceful degradation
  • Environment-specific service composition

ComponentManager Integration: Special service for managing component lifecycle

  • Manages component startup/shutdown and health monitoring
  • Provides HTTP APIs for component introspection and control
  • Integrates with component package for connectivity validation
  • Enables runtime component management and debugging
Integration Points
  • Dependencies: NATS client (required), MetricsRegistry (optional), Logger (optional), Manager (optional)
  • Used By: Main application for service orchestration, individual services for HTTP endpoints
  • Component Integration: ComponentManager service integrates with component package for lifecycle management
  • Data Flow: Configuration → Constructor → Service Instance → Manager → HTTP Endpoints

Configuration

Required Configuration
{
  "services": {
    "http_port": 8080,
    "swagger_ui": true,
    "component-manager": {
      "enabled": true
    },
    "metrics": {
      "enabled": true,
      "port": 9090,
      "path": "/metrics"
    }
  }
}
Optional Configuration
{
  "services": {
    "discovery": {
      "enabled": false
    },
    "message-logger": {
      "enabled": false,
      "max_messages": 1000
    },
    "service-manager": {
      "read_timeout": "10s",
      "write_timeout": "10s",
      "shutdown_timeout": "30s"
    }
  }
}

Error Handling

Error Types

This package defines the following error patterns:

// Service registration errors
ErrServiceAlreadyExists = errors.New("service: constructor already registered")
ErrInvalidConstructor  = errors.New("service: invalid constructor function")

// Service lifecycle errors  
ErrServiceNotFound     = errors.New("service: service not found")
ErrServiceStartup      = errors.New("service: failed to start")
ErrServiceShutdown     = errors.New("service: failed to stop gracefully")

// HTTP server errors
ErrHTTPServerStartup   = errors.New("service: failed to start HTTP server")
ErrPortInUse          = errors.New("service: HTTP port already in use")
Error Detection
svc, err := manager.CreateService("my-service", config, deps)
if errors.Is(err, service.ErrServiceNotFound) {
    // Handle missing service constructor
}

err = manager.StartAll(ctx)
if errors.Is(err, service.ErrServiceStartup) {
    // Handle service startup failure
}

Testing

Test Utilities

This package provides comprehensive test utilities for service testing:

// ServiceSuite provides NATS testcontainer and common setup
type ServiceSuite struct {
    natsClient *natsclient.TestClient
    manager    *Manager
    deps       *Dependencies
}

// Use in service tests
func (s *MyServiceSuite) SetupTest() {
    s.ServiceSuite.SetupTest()
    
    // Register and create your service
    service.RegisterConstructor("my-service", NewMyService)
    svc, err := s.manager.CreateService("my-service", config, s.deps)
    s.Require().NoError(err)
}

// Test service lifecycle
func (s *MyServiceSuite) TestMyService_Lifecycle() {
    err := s.service.Start(context.Background())
    s.Assert().NoError(err)
    s.Assert().True(s.service.IsHealthy())
    
    err = s.service.Stop(5 * time.Second)
    s.Assert().NoError(err)
}
Testing Patterns
  • Use ServiceSuite for integration tests with real NATS via testcontainers
  • Test service behavior through Service interface methods
  • Verify HTTP endpoints using httptest.ResponseRecorder
  • Test configuration parsing with various JSON inputs
  • Validate graceful shutdown and resource cleanup

For component-specific testing (including connectivity validation), see component package.

Performance Considerations

  • Concurrency: All Manager operations are thread-safe using read-write mutex
  • Memory: Services maintain references until explicitly stopped and removed
  • HTTP Performance: Single shared server eliminates overhead of multiple HTTP listeners
  • Startup Time: Services start in parallel where possible, sequentially where dependencies exist
  • Component Lifecycle: ComponentManager caches connectivity analysis for efficient repeated access

Examples

Example 1: Simple Monitoring Service
package main

import (
    "context"
    "encoding/json"
    "log"
    "net/http"
    "time"
    
    "github.com/c360/semstreams/service"
    "github.com/c360/semstreams/types"
)

// MonitoringService tracks system metrics
type MonitoringService struct {
    config   *MonitoringConfig
    platform types.PlatformMeta
    logger   *slog.Logger
    ticker   *time.Ticker
}

type MonitoringConfig struct {
    Enabled  bool          `json:"enabled"`
    Interval time.Duration `json:"interval"`
}

func NewMonitoringService(rawConfig json.RawMessage, deps *service.Dependencies) (service.Service, error) {
    cfg := &MonitoringConfig{
        Enabled:  true,
        Interval: 30 * time.Second,
    }
    
    if len(rawConfig) > 0 {
        if err := json.Unmarshal(rawConfig, cfg); err != nil {
            return nil, err
        }
    }
    
    return &MonitoringService{
        config:   cfg,
        platform: deps.Platform,
        logger:   deps.Logger,
    }, nil
}

func (m *MonitoringService) Start(ctx context.Context) error {
    if !m.config.Enabled {
        m.logger.Info("Monitoring service disabled")
        return nil
    }
    
    m.ticker = time.NewTicker(m.config.Interval)
    go m.monitoringLoop(ctx)
    
    m.logger.Info("Started monitoring service",
        "interval", m.config.Interval,
        "platform", m.platform.Platform)
    return nil
}

func (m *MonitoringService) Stop(timeout time.Duration) error {
    if m.ticker != nil {
        m.ticker.Stop()
    }
    m.logger.Info("Stopped monitoring service")
    return nil
}

func (m *MonitoringService) IsHealthy() bool {
    return m.config.Enabled && m.ticker != nil
}

func (m *MonitoringService) GetStatus() service.ServiceStatus {
    return service.ServiceStatus{
        Name:    "monitoring",
        Healthy: m.IsHealthy(),
        Details: map[string]any{
            "enabled":  m.config.Enabled,
            "interval": m.config.Interval.String(),
            "platform": m.platform.Platform,
        },
    }
}

func (m *MonitoringService) monitoringLoop(ctx context.Context) {
    for {
        select {
        case <-ctx.Done():
            return
        case <-m.ticker.C:
            m.logger.Debug("Monitoring tick", "platform", m.platform.Platform)
            // Monitoring logic here
        }
    }
}

// HTTP endpoints
func (m *MonitoringService) RegisterHTTPHandlers(prefix string, mux *http.ServeMux) {
    mux.HandleFunc(prefix+"/status", m.handleStatus)
    mux.HandleFunc(prefix+"/metrics", m.handleMetrics)
}

func (m *MonitoringService) OpenAPISpec() *service.OpenAPISpec {
    return &service.OpenAPISpec{
        Paths: map[string]service.PathItem{
            "/status": {
                Get: &service.Operation{
                    Summary: "Get monitoring status",
                    Responses: map[string]service.Response{
                        "200": {Description: "Monitoring status"},
                    },
                },
            },
        },
    }
}

func (m *MonitoringService) handleStatus(w http.ResponseWriter, r *http.Request) {
    status := m.GetStatus()
    json.NewEncoder(w).Encode(status)
}

func (m *MonitoringService) handleMetrics(w http.ResponseWriter, r *http.Request) {
    metrics := map[string]any{
        "platform": m.platform.Platform,
        "uptime":   time.Since(time.Now()), // Would track actual uptime
    }
    json.NewEncoder(w).Encode(metrics)
}

// Explicit registration via exported function
func Register(registry *service.Registry) error {
    return registry.Register("monitoring", NewMonitoringService)
}

func main() {
    // Service is automatically available to Manager
    log.Println("Monitoring service registered and ready")
}
Example 2: Service Coordination and Management
package main

import (
    "context"
    "encoding/json"
    "log"
    "time"
    
    "github.com/c360/semstreams/service"
    "github.com/c360/semstreams/types"
    "github.com/c360/semstreams/natsclient"
    "github.com/c360/semstreams/metric"
)

func main() {
    // Create dependencies
    natsClient, _ := natsclient.NewClient("nats://localhost:4222")
    metricsRegistry := metric.NewMetricsRegistry()
    platform := types.PlatformMeta{
        Org:      "example",
        Platform: "demo-platform",
    }
    
    deps := &service.Dependencies{
        NATSClient:      natsClient,
        MetricsRegistry: metricsRegistry,
        Logger:          slog.Default(),
        Platform:        platform,
    }
    
    // Get the default Manager
    manager := service.DefaultManager
    
    // Configure HTTP server
    manager.SetHTTPConfig(8080, true, service.InfoSpec{
        Title:   "Demo Services",
        Version: "1.0.0",
    })
    
    // Services are registered via RegisterAll()
    // Create services from configuration
    serviceConfigs := map[string]json.RawMessage{
        "monitoring": json.RawMessage(`{"enabled": true, "interval": "10s"}`),
        "metrics":    json.RawMessage(`{"enabled": true, "port": 9090}`),
    }
    
    // Create all configured services
    for name, config := range serviceConfigs {
        svc, err := manager.CreateService(name, config, deps)
        if err != nil {
            log.Printf("Failed to create service %s: %v", name, err)
            continue
        }
        log.Printf("Created service: %s", name)
    }
    
    // Start all services
    ctx := context.Background()
    if err := manager.StartAll(ctx); err != nil {
        log.Fatalf("Failed to start services: %v", err)
    }
    
    log.Println("All services started")
    log.Println("HTTP server available at http://localhost:8080")
    log.Println("API documentation at http://localhost:8080/docs")
    
    // Check service health
    for name, svc := range manager.GetAllServices() {
        if svc.IsHealthy() {
            log.Printf("Service %s: healthy", name)
        } else {
            log.Printf("Service %s: unhealthy", name)
        }
    }
    
    // Simulate running for a while
    time.Sleep(30 * time.Second)
    
    // Graceful shutdown
    log.Println("Shutting down services...")
    if err := manager.StopAll(10 * time.Second); err != nil {
        log.Printf("Error during shutdown: %v", err)
    }
    
    log.Println("All services stopped")
}

Known Limitations

  • HTTP server configuration cannot be changed at runtime (requires restart)
  • Service dependencies must be acyclic (enforced through import structure)
  • OpenAPI spec aggregation assumes unique operation IDs across services
  • Graceful shutdown timeout applies to all services equally (no per-service timeouts)
  • pkg/component: ComponentManager service uses component Registry for lifecycle management
  • pkg/types: Provides PlatformMeta and other shared types
  • pkg/natsclient: NATS client dependency for service messaging
  • pkg/metric: Optional metrics collection for services
  • pkg/config: Configuration management and Manager integration

Documentation

Overview

Package service provides base functionality and common patterns for long-running services in the semstreams platform. It includes health monitoring, lifecycle management, and metric collection capabilities.

Package service provides service management and HTTP APIs for the SemStreams platform.

Package service provides service lifecycle management, HTTP server coordination, and component orchestration for the StreamKit platform.

The service package implements a sophisticated service architecture with clearly separated responsibilities across multiple service types:

Core Service Types

BaseService: Foundation for all services with standardized lifecycle management:

  • Lifecycle states: Stopped → Starting → Running → Stopping
  • Health monitoring with periodic checks
  • Metrics integration with CoreMetrics registry
  • Context-based cancellation and graceful shutdown
  • Dependency injection through Dependencies

Manager: Central orchestration of HTTP server and service lifecycle:

  • HTTP server management with graceful shutdown
  • Service registration and dependency injection
  • Two-phase HTTP initialization (system endpoints → service endpoints)
  • Health aggregation across all services
  • OpenAPI documentation aggregation

ComponentManager: Dynamic component lifecycle management:

  • Component instantiation from registry factories
  • Flow-based component deployment
  • Runtime configuration updates via NATS KV
  • Flow graph validation with connectivity analysis
  • Health monitoring of managed components

FlowService: Visual flow builder HTTP API:

  • CRUD operations for flow definitions
  • Flow deployment via Engine integration
  • Real-time flow status monitoring
  • Validation feedback with detailed errors

Service Patterns

All services follow standardized patterns:

Constructor Pattern with Dependency Injection:

type MyService struct {
    *BaseService
    // service-specific fields
}

func NewMyService(deps Dependencies, config MyConfig) (*MyService, error) {
    base := NewBaseService("my-service", deps)
    svc := &MyService{BaseService: base}
    // Initialize service-specific fields
    return svc, nil
}

Lifecycle Implementation:

func (s *MyService) Initialize(ctx context.Context) error {
    // One-time initialization
    return s.BaseService.Initialize(ctx)
}

func (s *MyService) Start(ctx context.Context) error {
    // Start background operations
    return s.BaseService.Start(ctx)
}

func (s *MyService) Stop(ctx context.Context) error {
    // Graceful shutdown
    return s.BaseService.Stop(ctx)
}

HTTP Handler Integration:

func (s *MyService) RegisterHTTPHandlers(mux *http.ServeMux) {
    mux.HandleFunc("/api/v1/myservice/", s.handleRequest)
}

func (s *MyService) OpenAPISpec() map[string]any {
    return map[string]any{
        "paths": map[string]any{
            "/api/v1/myservice/": {
                "get": map[string]any{
                    "summary": "My service endpoint",
                    "responses": map[string]any{
                        "200": map[string]any{
                            "description": "Success",
                        },
                    },
                },
            },
        },
    }
}

Service Registration

Services are registered with Manager using constructor functions:

manager := service.NewServiceManager(deps)

// Register services
manager.RegisterConstructor("my-service", func(deps Dependencies) (Service, error) {
    return NewMyService(deps, config)
})

// Initialize and start all services
if err := manager.InitializeAll(ctx); err != nil {
    log.Fatal(err)
}
if err := manager.StartAll(ctx); err != nil {
    log.Fatal(err)
}

HTTP Server Management

Manager coordinates HTTP server lifecycle with two-phase initialization:

  1. Early Phase (initializeHTTPInfrastructure): - System endpoints registered: /health, /readyz, /metrics - HTTP server created but not started

  2. Late Phase (completeHTTPSetup): - Service endpoints registered after services start - OpenAPI documentation aggregated - HTTP server starts listening

This prevents race conditions and ensures system endpoints are available before service-specific endpoints.

Health Monitoring

Services implement health checks through BaseService:

// Override health check logic
func (s *MyService) healthCheck() error {
    if !s.isHealthy {
        return fmt.Errorf("service unhealthy: %v", s.lastError)
    }
    return nil
}

Health status is aggregated by Manager:

  • /health - Returns 200 if any service is healthy
  • /readyz - Returns 200 if all services are healthy

Metrics Integration

Services automatically register metrics with CoreMetrics:

  • semstreams_service_status - Current service status (gauge)
  • semstreams_messages_received_total - Message counter
  • semstreams_messages_processed_total - Processing counter
  • semstreams_health_checks_total - Health check counter

Component Management

ComponentManager integrates with the component registry and flow engine:

cm := service.NewComponentManager(deps, flowEngine, configMgr)

// Deploy flow with components
err := cm.DeployFlow(ctx, flowID, flowDef)

// Runtime configuration updates
err := cm.UpdateComponentConfig(ctx, componentName, newConfig)

// Health monitoring
status := cm.GetComponentStatus(componentName)

Error Handling

Services follow StreamKit error handling patterns:

  • Configuration errors: Return during construction
  • Initialization errors: Return from Initialize()
  • Runtime errors: Log and update health status
  • Shutdown errors: Log but continue graceful shutdown

Use project error wrapping for context:

import "github.com/c360studio/semstreams/pkg/errs"

if err := validateConfig(cfg); err != nil {
    return errs.WrapInvalid(err, "my-service", "NewMyService", "validate config")
}

Graceful Shutdown

Manager coordinates graceful shutdown in reverse order:

  1. Stop accepting new HTTP requests
  2. Stop services in reverse registration order
  3. Shutdown HTTP server with timeout
  4. Close remaining connections

Example:

// Main application
ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second)
defer cancel()

if err := manager.StopAll(ctx); err != nil {
    log.Printf("Graceful shutdown incomplete: %v", err)
}

Testing

The package provides ServiceSuite for integration testing with testcontainers:

func TestMyService(t *testing.T) {
    suite := service.NewServiceSuite(t)
    defer suite.Cleanup()

    // Suite provides NATS client, config manager, etc.
    svc, err := NewMyService(suite.Deps(), config)
    require.NoError(t, err)

    // Test service lifecycle
    err = svc.Initialize(suite.Context())
    require.NoError(t, err)
}

Security Considerations

The service HTTP APIs are designed for internal edge deployment:

  • No built-in authentication (add reverse proxy for production)
  • No rate limiting (implement at gateway level)
  • Path traversal protection on component endpoints
  • Input validation on all HTTP handlers

For production deployments, add external security layers:

  • Reverse proxy with authentication (nginx, Traefik)
  • Network policies to restrict access
  • TLS termination at gateway
  • Rate limiting at gateway level

Example: Complete Service Implementation

package main

import (
    "context"
    "log"
    "os"
    "os/signal"
    "syscall"

    "github.com/c360studio/semstreams/service"
    "github.com/c360studio/semstreams/config"
    "github.com/c360studio/semstreams/natsclient"
    "github.com/c360studio/semstreams/metric"
)

func main() {
    // Load configuration
    cfg, err := config.LoadMinimalConfig("config.json")
    if err != nil {
        log.Fatal(err)
    }

    // Initialize dependencies
    natsClient, err := natsclient.NewClient(cfg.NATS)
    if err != nil {
        log.Fatal(err)
    }
    defer natsClient.Close()

    metricsRegistry := metric.NewMetricsRegistry()
    configMgr := config.NewConfigManager(natsClient, cfg)

    deps := service.Dependencies{
        NATSClient:      natsClient,
        Manager:   configMgr,
        MetricsRegistry: metricsRegistry,
        Logger:          slog.Default(),
        Platform:        cfg.Platform,
    }

    // Create service manager
    manager := service.NewServiceManager(deps)

    // Register services
    manager.RegisterConstructor("flow-service", func(d Dependencies) (Service, error) {
        return service.NewFlowService(d, flowEngine, flowStore)
    })

    // Initialize and start
    ctx := context.Background()
    if err := manager.InitializeAll(ctx); err != nil {
        log.Fatal(err)
    }
    if err := manager.StartAll(ctx); err != nil {
        log.Fatal(err)
    }

    // Wait for signal
    sig := make(chan os.Signal, 1)
    signal.Notify(sig, syscall.SIGINT, syscall.SIGTERM)
    <-sig

    // Graceful shutdown
    shutdownCtx, cancel := context.WithTimeout(context.Background(), 30*time.Second)
    defer cancel()
    if err := manager.StopAll(shutdownCtx); err != nil {
        log.Printf("Shutdown error: %v", err)
    }
}

For more details and examples, see the README.md in this directory.

Package service provides the Heartbeat service for emitting periodic system health logs.

Package service provides the LogForwarder service for log management.

Package service provides the MessageLogger service for observing message flow

Package service provides the MetricsForwarder service for forwarding metrics to NATS

Package service provides OpenAPI specification types for HTTP endpoint documentation

Package service provides the OpenAPI registry for service specifications

Package service provides OpenAPI specification types for HTTP endpoint documentation

Package service provides service registration

Package service provides service registration and management

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

func GetAllOpenAPISpecs

func GetAllOpenAPISpecs() map[string]*OpenAPISpec

GetAllOpenAPISpecs returns all registered OpenAPI specifications. Used by the openapi-generator tool to collect specs from all services.

func RegisterAll

func RegisterAll(registry *Registry) error

RegisterAll registers all built-in services with the registry Future: Can be split into Registercore (), RegisterMonitoring(), etc.

func RegisterOpenAPISpec

func RegisterOpenAPISpec(name string, spec *OpenAPISpec)

RegisterOpenAPISpec registers an OpenAPI specification for a service. This should be called from init() functions in service files.

func SchemaFromType

func SchemaFromType(t reflect.Type) map[string]any

SchemaFromType generates a JSON Schema from a reflect.Type. It handles primitives, structs, slices, maps, pointers, and time.Time.

func TypeNameFromReflect

func TypeNameFromReflect(t reflect.Type) string

TypeNameFromReflect extracts a clean type name from a reflect.Type. For example: "service.RuntimeHealthResponse" -> "RuntimeHealthResponse"

Types

type BaseService

type BaseService struct {
	// contains filtered or unexported fields
}

BaseService provides common functionality for all services

func NewBaseServiceWithOptions

func NewBaseServiceWithOptions(name string, cfg *config.Config, opts ...Option) *BaseService

NewBaseServiceWithOptions creates a new base service using functional options pattern

func (*BaseService) GetStatus

func (s *BaseService) GetStatus() Info

GetStatus returns the current service information

func (*BaseService) Health

func (s *BaseService) Health() health.Status

Health returns the standard health status for the service

func (*BaseService) IsHealthy

func (s *BaseService) IsHealthy() bool

IsHealthy returns whether the service is healthy

func (*BaseService) Name

func (s *BaseService) Name() string

Name returns the service name

func (*BaseService) OnHealthChange

func (s *BaseService) OnHealthChange(callback func(bool))

OnHealthChange sets a callback for health state changes

func (*BaseService) RegisterMetrics

func (s *BaseService) RegisterMetrics(_ metric.MetricsRegistrar) error

RegisterMetrics allows services to register their own domain-specific metrics

func (*BaseService) SetHealthCheck

func (s *BaseService) SetHealthCheck(fn HealthCheckFunc)

SetHealthCheck sets a custom health check function

func (*BaseService) Start

func (s *BaseService) Start(ctx context.Context) error

Start starts the service

func (*BaseService) Status

func (s *BaseService) Status() Status

Status returns the current service status

func (*BaseService) Stop

func (s *BaseService) Stop(timeout time.Duration) error

Stop stops the service gracefully

type ClientState

type ClientState struct {
	// contains filtered or unexported fields
}

ClientState represents client subscription state

func (*ClientState) GetLogLevel

func (cs *ClientState) GetLogLevel() string

GetLogLevel returns the current log level filter

func (*ClientState) GetSources

func (cs *ClientState) GetSources() []string

GetSources returns the current source filters

func (*ClientState) GetSubscribedTypes

func (cs *ClientState) GetSubscribedTypes() []string

GetSubscribedTypes returns the list of currently subscribed message types

func (*ClientState) IsSubscribed

func (cs *ClientState) IsSubscribed(messageType string) bool

IsSubscribed checks if the client is subscribed to a message type

func (*ClientState) ShouldReceiveLogLevel

func (cs *ClientState) ShouldReceiveLogLevel(level string) bool

ShouldReceiveLogLevel checks if the client should receive logs at the given level Log level hierarchy: DEBUG=0 < INFO=1 < WARN=2 < ERROR=3

func (*ClientState) ShouldReceiveSource

func (cs *ClientState) ShouldReceiveSource(source string) bool

ShouldReceiveSource checks if the client should receive logs from the given source

func (*ClientState) UpdateSubscription

func (cs *ClientState) UpdateSubscription(messageTypes []string, logLevel string, sources []string)

UpdateSubscription updates the client's subscription filters Empty arrays are treated as "keep current subscriptions" rather than "unsubscribe all"

type ComponentGap

type ComponentGap struct {
	ComponentName string   `json:"component_name"`
	Issue         string   `json:"issue"`
	Description   string   `json:"description"`
	Suggestions   []string `json:"suggestions,omitempty"`
}

ComponentGap represents a connectivity gap in the component flow

type ComponentHealth

type ComponentHealth struct {
	Name          string              `json:"name"`
	Component     string              `json:"component"` // Component factory name (e.g., "udp", "graph-processor")
	Type          types.ComponentType `json:"type"`      // Component category (input/processor/output/storage/gateway)
	Status        string              `json:"status"`    // "running", "degraded", "error", "stopped"
	Healthy       bool                `json:"healthy"`
	Message       string              `json:"message"`
	StartTime     *time.Time          `json:"start_time"`     // ISO 8601 timestamp, null if not started
	LastActivity  *time.Time          `json:"last_activity"`  // ISO 8601 timestamp, null if no activity
	UptimeSeconds *float64            `json:"uptime_seconds"` // null if not started
	Details       any                 `json:"details"`        // Additional details for degraded/error states
}

ComponentHealth represents health and timing for a single component

type ComponentManager

type ComponentManager struct {
	*BaseService
	// contains filtered or unexported fields
}

ComponentManager handles lifecycle management of all components (inputs, processors, outputs) through the unified component system.

ComponentManager follows lifecycle:

Initialize() - Create components but don't start them
Start(ctx)   - Start initialized components with context
Stop()       - Stop components in reverse order

func (*ComponentManager) Component

func (cm *ComponentManager) Component(name string) component.Discoverable

Component retrieves a specific component instance by name

func (*ComponentManager) CreateComponent

func (cm *ComponentManager) CreateComponent(
	ctx context.Context, instanceName string, cfg types.ComponentConfig, deps component.Dependencies,
) error

CreateComponent creates a new component instance and registers it This is for runtime component creation, not part of the normal Initialize/Start flow

func (*ComponentManager) CreateComponentsFromConfig

func (cm *ComponentManager) CreateComponentsFromConfig(ctx context.Context, cfg *config.Config) error

CreateComponentsFromConfig creates and initializes components based on configuration

func (*ComponentManager) DetectObjectStoreGaps

func (cm *ComponentManager) DetectObjectStoreGaps() []ComponentGap

DetectObjectStoreGaps identifies disconnected storage components

func (*ComponentManager) GetComponentHealth

func (cm *ComponentManager) GetComponentHealth() map[string]component.HealthStatus

GetComponentHealth returns current health status for all managed components Direct component health queries using the component.Health() interface

func (*ComponentManager) GetComponentStatus

func (cm *ComponentManager) GetComponentStatus() map[string]ComponentStatus

GetComponentStatus returns combined lifecycle state and health status for all components

func (*ComponentManager) GetFlowGraph

func (cm *ComponentManager) GetFlowGraph() *flowgraph.FlowGraph

GetFlowGraph returns the current FlowGraph, using cache if valid

func (*ComponentManager) GetFlowPaths

func (cm *ComponentManager) GetFlowPaths() map[string][]string

GetFlowPaths returns data paths from input components to all reachable components

func (*ComponentManager) GetHealthyComponents

func (cm *ComponentManager) GetHealthyComponents() []string

GetHealthyComponents returns names of components that report healthy status

func (*ComponentManager) GetManagedComponents

func (cm *ComponentManager) GetManagedComponents() map[string]*component.ManagedComponent

GetManagedComponents returns a copy of all managed components with their state

func (*ComponentManager) GetRegistry

func (cm *ComponentManager) GetRegistry() *component.Registry

GetRegistry returns the component registry for schema introspection This is used by the schema API to access component schemas

func (*ComponentManager) GetUnhealthyComponents

func (cm *ComponentManager) GetUnhealthyComponents() []string

GetUnhealthyComponents returns names of components that report unhealthy status

func (*ComponentManager) Initialize

func (cm *ComponentManager) Initialize() error

Initialize creates all configured components but does not start them This follows the unified Pattern A lifecycle where creation is separate from starting

func (*ComponentManager) IsInitialized

func (cm *ComponentManager) IsInitialized() bool

IsInitialized returns true if the component manager is initialized

func (*ComponentManager) IsStarted

func (cm *ComponentManager) IsStarted() bool

IsStarted returns true if the component manager is started

func (*ComponentManager) ListComponents

func (cm *ComponentManager) ListComponents() map[string]component.Discoverable

ListComponents returns all registered component instances

func (*ComponentManager) OpenAPISpec

func (cm *ComponentManager) OpenAPISpec() *OpenAPISpec

OpenAPISpec returns the OpenAPI specification for ComponentManager endpoints

func (*ComponentManager) RegisterComponentErrorHook

func (cm *ComponentManager) RegisterComponentErrorHook(hook func(ctx context.Context, name string, err error))

RegisterComponentErrorHook registers a callback for component error events

func (*ComponentManager) RegisterComponentStartHook

func (cm *ComponentManager) RegisterComponentStartHook(
	hook func(ctx context.Context, name string, comp component.Discoverable),
)

RegisterComponentStartHook registers a callback for component start events

func (*ComponentManager) RegisterComponentStopHook

func (cm *ComponentManager) RegisterComponentStopHook(hook func(ctx context.Context, name string, reason string))

RegisterComponentStopHook registers a callback for component stop events

func (*ComponentManager) RegisterHTTPHandlers

func (cm *ComponentManager) RegisterHTTPHandlers(prefix string, mux *http.ServeMux)

RegisterHTTPHandlers registers HTTP endpoints for the ComponentManager service

func (*ComponentManager) RegisterHealthChangeHook

func (cm *ComponentManager) RegisterHealthChangeHook(
	hook func(ctx context.Context, name string, healthy bool, details string),
)

RegisterHealthChangeHook registers a callback for health change events

func (*ComponentManager) RemoveComponent

func (cm *ComponentManager) RemoveComponent(instanceName string) error

RemoveComponent stops and removes a component instance

func (*ComponentManager) Start

func (cm *ComponentManager) Start(ctx context.Context) error

Start starts all initialized components with proper context flow-through

func (*ComponentManager) Stop

func (cm *ComponentManager) Stop(timeout time.Duration) error

Stop gracefully stops all components in reverse order of startup

func (*ComponentManager) ValidateFlowConnectivity

func (cm *ComponentManager) ValidateFlowConnectivity() *flowgraph.FlowAnalysisResult

ValidateFlowConnectivity performs FlowGraph connectivity analysis with caching

type ComponentManagerConfig

type ComponentManagerConfig struct {
	// WatchConfig enables dynamic configuration updates via NATS KV bucket.
	// When true, the manager watches for changes to component configurations
	// and applies them at runtime without service restart.
	WatchConfig bool `json:"watch_config" schema:"type:boolean,description:Enable config watching via NATS KV,default:false,category:basic"`

	// EnabledComponents lists component names to enable.
	// If empty, all registered components are enabled.
	// Use this to selectively enable specific components in a deployment.
	EnabledComponents []string `json:"enabled_components" schema:"type:array,description:List of component names to enable (empty=all),category:basic"`
}

ComponentManagerConfig configures the ComponentManager service.

The ComponentManager orchestrates component lifecycle (create, start, stop) and optionally watches for configuration changes via NATS KV.

func DefaultComponentManagerConfig

func DefaultComponentManagerConfig() ComponentManagerConfig

DefaultComponentManagerConfig returns the default configuration.

func (ComponentManagerConfig) Validate

func (c ComponentManagerConfig) Validate() error

Validate checks if the configuration is valid

type ComponentMetric

type ComponentMetric struct {
	Name        string              `json:"name"`
	Component   string              `json:"component"` // Component factory name (e.g., "udp", "graph-processor")
	Type        types.ComponentType `json:"type"`      // Component category (input/processor/output/storage/gateway)
	Status      string              `json:"status"`
	Throughput  *float64            `json:"throughput"`   // msgs/sec, null if unavailable
	ErrorRate   *float64            `json:"error_rate"`   // errors/sec, null if unavailable
	QueueDepth  *float64            `json:"queue_depth"`  // current queue depth, null if unavailable
	RawCounters *map[string]uint64  `json:"raw_counters"` // only present when Prometheus unavailable
}

ComponentMetric represents metrics for a single component

type ComponentPortDetail

type ComponentPortDetail struct {
	Name      string              `json:"name"`
	Direction component.Direction `json:"direction"`
	Subject   string              `json:"subject"`
	PortType  string              `json:"port_type"`
}

ComponentPortDetail represents detailed information about a single port

type ComponentPortInfo

type ComponentPortInfo struct {
	ComponentName string                `json:"component_name"`
	InputPorts    []ComponentPortDetail `json:"input_ports"`
	OutputPorts   []ComponentPortDetail `json:"output_ports"`
}

ComponentPortInfo represents port information extracted from a component

type ComponentPortReference

type ComponentPortReference struct {
	ComponentName string `json:"component_name"`
	PortName      string `json:"port_name"`
}

ComponentPortReference references a specific port on a component

type ComponentStatus

type ComponentStatus struct {
	Name      string                 `json:"name"`
	State     component.State        `json:"state"`
	Health    component.HealthStatus `json:"health"`
	DataFlow  component.FlowMetrics  `json:"data_flow"`
	LastError error                  `json:"last_error,omitempty"`
}

ComponentStatus combines lifecycle state with health and flow metrics

type ComponentsSpec

type ComponentsSpec struct {
	Schemas map[string]any `json:"schemas,omitempty"`
}

ComponentsSpec holds reusable OpenAPI component definitions

type ConfigSchema

type ConfigSchema struct {
	component.ConfigSchema

	// ServiceSpecific can hold any service-specific schema extensions
	ServiceSpecific map[string]any `json:"service_specific,omitempty"`
}

ConfigSchema describes the configuration parameters for a service. We embed the component ConfigSchema for consistency across the system.

func NewConfigSchema

func NewConfigSchema(properties map[string]PropertySchema, required []string) ConfigSchema

NewConfigSchema creates a service ConfigSchema with extended property schemas

type Configurable

type Configurable interface {
	// ConfigSchema returns the configuration schema for this service
	ConfigSchema() ConfigSchema
}

Configurable is an optional interface for services that expose their configuration schema. This enables UI discovery and validation of service configurations. Services implementing this interface can describe their configuration parameters, including which fields can be changed at runtime without restart.

NOTE: This interface is reserved for future UI features. Currently only the Metrics service implements it. The Discovery service exists to expose this information via HTTP API.

type Constructor

type Constructor func(rawConfig json.RawMessage, deps *Dependencies) (Service, error)

Constructor defines the standard constructor signature for all services. Every service must have a constructor that follows this pattern. The constructor receives raw JSON config and must handle its own parsing.

type Dependencies

type Dependencies struct {
	NATSClient        *natsclient.Client
	MetricsRegistry   *metric.MetricsRegistry
	Logger            *slog.Logger
	Platform          types.PlatformMeta  // Platform identity
	Manager           *config.Manager     // Centralized configuration management
	ComponentRegistry *component.Registry // Component registry for ComponentManager
	ServiceManager    *Manager            // Service manager for accessing other services
}

Dependencies provides the standard dependencies that all services receive. This replaces the old Dependencies struct and provides consistent injection. Services should use HTTP or NATS RPC for inter-service communication.

type ErrorResponse

type ErrorResponse struct {
	Type    string `json:"type"`    // Always "error"
	Code    string `json:"code"`    // Error code: "invalid_json", "unknown_command", "missing_command"
	Message string `json:"message"` // Human-readable error message
}

ErrorResponse is sent to client when a command fails (Server → Client)

type FlowConnection

type FlowConnection struct {
	Publisher  ComponentPortReference `json:"publisher"`
	Subscriber ComponentPortReference `json:"subscriber"`
	Subject    string                 `json:"subject"`
}

FlowConnection represents a connection between publisher and subscriber

type FlowGap

type FlowGap struct {
	ComponentName string `json:"component_name"`
	PortName      string `json:"port_name"`
	Subject       string `json:"subject"`
	Direction     string `json:"direction"` // "input" or "output"
	Issue         string `json:"issue"`     // "no_publishers" or "no_subscribers"
}

FlowGap represents a disconnected port (no matching publisher/subscriber)

type FlowService

type FlowService struct {
	*BaseService
	// contains filtered or unexported fields
}

FlowService provides HTTP APIs for visual flow builder

func (*FlowService) OpenAPISpec

func (fs *FlowService) OpenAPISpec() *OpenAPISpec

OpenAPISpec returns the OpenAPI specification for flow service endpoints

func (*FlowService) RegisterHTTPHandlers

func (fs *FlowService) RegisterHTTPHandlers(prefix string, mux *http.ServeMux)

RegisterHTTPHandlers registers HTTP endpoints for the flow service

func (*FlowService) Start

func (fs *FlowService) Start(ctx context.Context) error

Start starts the flow service

func (*FlowService) Stop

func (fs *FlowService) Stop(timeout time.Duration) error

Stop stops the flow service

type FlowServiceConfig

type FlowServiceConfig struct {
	// PrometheusURL is the base URL for Prometheus HTTP API
	// Default: http://localhost:9090
	PrometheusURL string `json:"prometheus_url,omitempty"`

	// FallbackToRaw enables falling back to raw metrics when Prometheus unavailable
	// Default: true
	FallbackToRaw bool `json:"fallback_to_raw,omitempty"`

	// LogStreamBufferSize is the buffer size for SSE log streaming channel
	// Larger buffers reduce dropped logs during bursts but use more memory.
	// Default: 100
	LogStreamBufferSize int `json:"log_stream_buffer_size,omitempty"`
}

FlowServiceConfig holds configuration for the flow service

type FlowStatusPayload

type FlowStatusPayload struct {
	State     string `json:"state"`           // Current state: draft, deployed, running, stopped, failed
	PrevState string `json:"prev_state"`      // Previous state (if changed)
	Timestamp int64  `json:"timestamp"`       // State change timestamp (Unix milliseconds)
	Error     string `json:"error,omitempty"` // Error message if state=failed
}

FlowStatusPayload is the payload for type=flow_status messages

type HTTPHandler

type HTTPHandler interface {
	RegisterHTTPHandlers(prefix string, mux *http.ServeMux)
	OpenAPISpec() *OpenAPISpec // Returns OpenAPI specification for this service
}

HTTPHandler is an optional interface for services that want to expose HTTP endpoints

type HealthCheckFunc

type HealthCheckFunc func() error

HealthCheckFunc defines a custom health check function

type HeartbeatConfig

type HeartbeatConfig struct {
	// Interval between heartbeat logs (e.g., "30s", "1m")
	// Default: "30s"
	Interval string `json:"interval"`
}

HeartbeatConfig holds configuration for the Heartbeat service

func (HeartbeatConfig) Validate

func (c HeartbeatConfig) Validate() error

Validate checks if the configuration is valid

type HeartbeatService

type HeartbeatService struct {
	*BaseService
	// contains filtered or unexported fields
}

HeartbeatService emits periodic system heartbeat logs

func (*HeartbeatService) Start

func (hb *HeartbeatService) Start(ctx context.Context) error

Start begins the heartbeat service

func (*HeartbeatService) Stop

func (hb *HeartbeatService) Stop(timeout time.Duration) error

Stop gracefully stops the heartbeat service

type Info

type Info struct {
	Name               string        `json:"name"`
	Status             Status        `json:"status"`
	Uptime             time.Duration `json:"uptime"`
	StartTime          time.Time     `json:"start_time"`
	MessagesProcessed  int64         `json:"messages_processed"`
	LastActivity       time.Time     `json:"last_activity"`
	HealthChecks       int64         `json:"health_checks"`
	FailedHealthChecks int64         `json:"failed_health_checks"`
}

Info holds runtime information for a service

type InfoSpec

type InfoSpec struct {
	Title       string `json:"title"`
	Description string `json:"description"`
	Version     string `json:"version"`
}

InfoSpec contains API metadata

type KVTestHelper

type KVTestHelper struct {
	// contains filtered or unexported fields
}

KVTestHelper provides utilities for KV-based testing with JSON-only format

func NewKVTestHelper

func NewKVTestHelper(t *testing.T, nc *natsclient.Client) *KVTestHelper

NewKVTestHelper creates an isolated test KV bucket

func (*KVTestHelper) AssertValidKVKey

func (h *KVTestHelper) AssertValidKVKey(key string)

AssertValidKVKey validates key format per KV schema requirements

func (*KVTestHelper) GetServiceConfig

func (h *KVTestHelper) GetServiceConfig(service string) (map[string]any, uint64, error)

GetServiceConfig reads current service configuration

func (*KVTestHelper) SimulateConcurrentUpdate

func (h *KVTestHelper) SimulateConcurrentUpdate(service string) error

SimulateConcurrentUpdate tests optimistic locking behavior

func (*KVTestHelper) UpdateServiceConfig

func (h *KVTestHelper) UpdateServiceConfig(service string, updateFn func(config map[string]any) error) error

UpdateServiceConfig updates with optimistic locking (uses KVStore CAS)

func (*KVTestHelper) WaitForConfigPropagation

func (h *KVTestHelper) WaitForConfigPropagation(_ time.Duration) bool

WaitForConfigPropagation waits for config changes to propagate

func (*KVTestHelper) WriteComponentConfig

func (h *KVTestHelper) WriteComponentConfig(componentType, name string, config map[string]any) uint64

WriteComponentConfig writes component configuration (for future use)

func (*KVTestHelper) WriteServiceConfig

func (h *KVTestHelper) WriteServiceConfig(service string, config map[string]any) uint64

WriteServiceConfig writes a complete service configuration (JSON-only)

type KVWatchConnectedEvent

type KVWatchConnectedEvent struct {
	Bucket  string `json:"bucket"`
	Pattern string `json:"pattern"`
	Message string `json:"message"`
}

KVWatchConnectedEvent represents the initial connection event

type KVWatchEvent

type KVWatchEvent struct {
	Bucket    string          `json:"bucket"`
	Key       string          `json:"key"`
	Operation string          `json:"operation"` // "create", "update", "delete"
	Value     json.RawMessage `json:"value,omitempty"`
	Revision  uint64          `json:"revision"`
	Timestamp time.Time       `json:"timestamp"`
}

KVWatchEvent represents a KV change event sent via SSE

type LogEntryPayload

type LogEntryPayload struct {
	Level   string         `json:"level"`   // DEBUG, INFO, WARN, ERROR
	Source  string         `json:"source"`  // Component or service name
	Message string         `json:"message"` // Log message
	Fields  map[string]any `json:"fields"`  // Structured log fields
}

LogEntryPayload is the payload for type=log_entry messages

type LogForwarder

type LogForwarder struct {
	*BaseService
	// contains filtered or unexported fields
}

LogForwarder is a service for log configuration management. With the new architecture, actual log forwarding to NATS is handled by NATSLogHandler in pkg/logging. This service validates configuration and provides a service endpoint.

func NewLogForwarder

func NewLogForwarder(config *LogForwarderConfig, opts ...Option) (*LogForwarder, error)

NewLogForwarder creates a new LogForwarder service.

func (*LogForwarder) Config

func (lf *LogForwarder) Config() LogForwarderConfig

Config returns the LogForwarder configuration. This can be used by other components to access the log configuration.

func (*LogForwarder) Start

func (lf *LogForwarder) Start(ctx context.Context) error

Start begins the LogForwarder service. Note: Log forwarding to NATS is handled by NATSLogHandler in main.go. This service provides configuration validation and service lifecycle management.

func (*LogForwarder) Stop

func (lf *LogForwarder) Stop(timeout time.Duration) error

Stop gracefully stops the LogForwarder.

type LogForwarderConfig

type LogForwarderConfig struct {
	// MinLevel is the minimum log level to forward to NATS (DEBUG, INFO, WARN, ERROR).
	// Logs below this level are still written to stdout but not published to NATS.
	MinLevel string `json:"min_level"`

	// ExcludeSources is a list of source prefixes to exclude from NATS forwarding.
	// Logs from excluded sources still go to stdout but are not published to NATS.
	// Uses prefix matching with dotted notation: excluding "flow-service.websocket"
	// also excludes "flow-service.websocket.health" but NOT "flow-service".
	ExcludeSources []string `json:"exclude_sources"`
}

LogForwarderConfig holds configuration for the LogForwarder service. Note: The service is enabled/disabled via types.ServiceConfig.Enabled at the outer level.

Configuration is used by: - NATSLogHandler (in pkg/logging) for min_level and exclude_sources filtering - This service for configuration validation

func (LogForwarderConfig) Validate

func (c LogForwarderConfig) Validate() error

Validate checks if the configuration is valid.

type Manager

type Manager struct {
	*BaseService // Embed BaseService to implement Service interface
	// contains filtered or unexported fields
}

Manager manages service lifecycle using a provided registry. Services are explicitly registered and created from raw JSON configs.

func NewServiceManager

func NewServiceManager(registry *Registry) *Manager

NewServiceManager creates a new service manager

func (*Manager) ConfigureFromServices

func (m *Manager) ConfigureFromServices(services map[string]types.ServiceConfig, deps *Dependencies) error

ConfigureFromServices configures Manager directly from services config This replaces the old pattern where Manager was a service itself

func (*Manager) CreateService

func (m *Manager) CreateService(name string, rawConfig json.RawMessage, deps *Dependencies) (Service, error)

CreateService creates a service instance using the registered constructor

func (*Manager) GetAllServiceStatus

func (m *Manager) GetAllServiceStatus() map[string]any

GetAllServiceStatus returns the status of all services

func (*Manager) GetAllServices

func (m *Manager) GetAllServices() map[string]Service

GetAllServices returns all registered service instances

func (*Manager) GetHealthyServices

func (m *Manager) GetHealthyServices() []string

GetHealthyServices returns a list of healthy services

func (*Manager) GetService

func (m *Manager) GetService(name string) (Service, bool)

GetService returns a service instance by name

func (*Manager) GetServiceRuntimeConfig

func (m *Manager) GetServiceRuntimeConfig(serviceName string) (map[string]any, error)

GetServiceRuntimeConfig returns current runtime configuration for a service

func (*Manager) GetServiceStatus

func (m *Manager) GetServiceStatus(name string) (any, error)

GetServiceStatus returns the status of a specific service

func (*Manager) GetUnhealthyServices

func (m *Manager) GetUnhealthyServices() []string

GetUnhealthyServices returns a list of unhealthy services

func (*Manager) HasConstructor

func (m *Manager) HasConstructor(name string) bool

HasConstructor checks if a constructor is registered

func (*Manager) ListConstructors

func (m *Manager) ListConstructors() []string

ListConstructors returns all registered constructor names

func (*Manager) RemoveService

func (m *Manager) RemoveService(name string)

RemoveService removes a service instance

func (*Manager) Start

func (m *Manager) Start(ctx context.Context) error

Start starts the Manager HTTP server if configured

func (*Manager) StartAll

func (m *Manager) StartAll(ctx context.Context) error

StartAll starts all registered service instances and the HTTP server

func (*Manager) StartService

func (m *Manager) StartService(ctx context.Context, name string, rawConfig json.RawMessage, deps *Dependencies) error

StartService creates and starts a single service if not already running

func (*Manager) Stop

func (m *Manager) Stop(timeout time.Duration) error

Stop stops the Manager HTTP server

func (*Manager) StopAll

func (m *Manager) StopAll(timeout time.Duration) error

StopAll stops all registered service instances in reverse order and the HTTP server

func (*Manager) StopService

func (m *Manager) StopService(name string, timeout time.Duration) error

StopService stops and removes a single service

type ManagerConfig

type ManagerConfig struct {
	HTTPPort   int      `json:"http_port"`
	SwaggerUI  bool     `json:"swagger_ui"`
	ServerInfo InfoSpec `json:"server_info"`
}

ManagerConfig holds configuration for the Manager HTTP server Simple struct - no UnmarshalJSON, no Enabled field

func (ManagerConfig) Validate

func (c ManagerConfig) Validate() error

Validate checks if the configuration is valid

type MessageLogEntry

type MessageLogEntry struct {
	Sequence    uint64          `json:"sequence"` // Monotonic sequence for index validity
	Timestamp   time.Time       `json:"timestamp"`
	Subject     string          `json:"subject"`
	MessageType string          `json:"message_type,omitempty"`
	MessageID   string          `json:"message_id,omitempty"`
	TraceID     string          `json:"trace_id,omitempty"` // W3C trace ID (32 hex chars)
	SpanID      string          `json:"span_id,omitempty"`  // W3C span ID (16 hex chars)
	Summary     string          `json:"summary"`
	RawData     json.RawMessage `json:"raw_data,omitempty"`
	Metadata    map[string]any  `json:"metadata,omitempty"`
}

MessageLogEntry represents a logged message

type MessageLogger

type MessageLogger struct {
	*BaseService
	// contains filtered or unexported fields
}

MessageLogger provides message observation and logging as a service

func NewMessageLogger

func NewMessageLogger(
	loggerConfig *MessageLoggerConfig,
	natsClient *natsclient.Client,
	opts ...Option,
) (*MessageLogger, error)

NewMessageLogger creates a new MessageLogger service

func (*MessageLogger) ApplyConfigUpdate

func (ml *MessageLogger) ApplyConfigUpdate(changes map[string]any) error

ApplyConfigUpdate applies validated configuration changes. This implements the RuntimeConfigurable interface.

func (*MessageLogger) ConfigSchema

func (ml *MessageLogger) ConfigSchema() ConfigSchema

ConfigSchema returns the configuration schema for this service. This implements the Configurable interface for UI discovery.

func (*MessageLogger) GetEntriesByTrace

func (ml *MessageLogger) GetEntriesByTrace(traceID string) []MessageLogEntry

GetEntriesByTrace returns all log entries for a specific trace ID Entries are returned in chronological order (by sequence number)

func (*MessageLogger) GetLogEntries

func (ml *MessageLogger) GetLogEntries(limit int) []MessageLogEntry

GetLogEntries returns recent log entries with optional limit

func (*MessageLogger) GetMessages

func (ml *MessageLogger) GetMessages() []MessageLogEntry

GetMessages returns recent log entries

func (*MessageLogger) GetRuntimeConfig

func (ml *MessageLogger) GetRuntimeConfig() map[string]any

GetRuntimeConfig returns current configuration values. This implements the RuntimeConfigurable interface.

func (*MessageLogger) GetStatistics

func (ml *MessageLogger) GetStatistics() map[string]any

GetStatistics returns runtime statistics

func (*MessageLogger) OpenAPISpec

func (ml *MessageLogger) OpenAPISpec() *OpenAPISpec

OpenAPISpec returns the OpenAPI specification for MessageLogger endpoints

func (*MessageLogger) RegisterHTTPHandlers

func (ml *MessageLogger) RegisterHTTPHandlers(prefix string, mux *http.ServeMux)

RegisterHTTPHandlers registers HTTP endpoints for the MessageLogger service

func (*MessageLogger) Start

func (ml *MessageLogger) Start(ctx context.Context) error

Start begins message observation

func (*MessageLogger) Stop

func (ml *MessageLogger) Stop(timeout time.Duration) error

Stop gracefully stops the MessageLogger

func (*MessageLogger) ValidateConfigUpdate

func (ml *MessageLogger) ValidateConfigUpdate(changes map[string]any) error

ValidateConfigUpdate checks if the proposed changes are valid. This implements the RuntimeConfigurable interface.

type MessageLoggerConfig

type MessageLoggerConfig struct {
	// Subjects to monitor
	// Use "*" to auto-discover subjects from flow component configs
	// Example: ["*"] or ["*", "debug.>"] or ["raw.udp.messages", "processed.>"]
	MonitorSubjects []string `json:"monitor_subjects"`

	// Maximum entries to keep in memory for querying
	MaxEntries int `json:"max_entries"`

	// Whether to output to stdout
	OutputToStdout bool `json:"output_to_stdout"`

	// Log level threshold (DEBUG, INFO, WARN, ERROR)
	LogLevel string `json:"log_level"`

	// SampleRate controls message sampling (1 in N messages logged)
	// 0 or 1 = log all messages, 10 = log 10% of messages
	SampleRate int `json:"sample_rate"`
}

MessageLoggerConfig holds configuration for the MessageLogger service Simple struct - no UnmarshalJSON, no Enabled field

func DefaultMessageLoggerConfig

func DefaultMessageLoggerConfig() MessageLoggerConfig

DefaultMessageLoggerConfig returns sensible defaults

func (MessageLoggerConfig) Validate

func (c MessageLoggerConfig) Validate() error

Validate checks if the configuration is valid

type MetricEntry

type MetricEntry struct {
	Name   string            `json:"name"`   // Metric name
	Type   string            `json:"type"`   // counter, gauge, histogram
	Value  float64           `json:"value"`  // Current value
	Labels map[string]string `json:"labels"` // Metric labels
}

MetricEntry represents a single metric in a MetricsPayload

type Metrics

type Metrics struct {
	*BaseService
	// contains filtered or unexported fields
}

Metrics is a service that provides Prometheus metrics endpoint

func (*Metrics) ApplyConfigUpdate

func (m *Metrics) ApplyConfigUpdate(changes map[string]any) error

ApplyConfigUpdate applies validated runtime configuration changes

func (*Metrics) ConfigSchema

func (m *Metrics) ConfigSchema() ConfigSchema

ConfigSchema returns the configuration schema for the metrics service. This implements the Configurable interface for UI discovery.

func (*Metrics) GetRuntimeConfig

func (m *Metrics) GetRuntimeConfig() map[string]any

GetRuntimeConfig returns current runtime configuration

func (*Metrics) Path

func (m *Metrics) Path() string

Path returns the metrics endpoint path

func (*Metrics) Port

func (m *Metrics) Port() int

Port returns the port the metrics server is listening on

func (*Metrics) Start

func (m *Metrics) Start(ctx context.Context) error

Start starts the metrics HTTP server

func (*Metrics) Stop

func (m *Metrics) Stop(timeout time.Duration) error

Stop stops the metrics HTTP server

func (*Metrics) URL

func (m *Metrics) URL() string

URL returns the full URL for the metrics endpoint

func (*Metrics) ValidateConfigUpdate

func (m *Metrics) ValidateConfigUpdate(changes map[string]any) error

ValidateConfigUpdate validates runtime configuration changes

type MetricsConfig

type MetricsConfig struct {
	Port int    `json:"port"`
	Path string `json:"path"`
}

MetricsConfig holds configuration for the metrics service Simple struct - no UnmarshalJSON, no Enabled field

func (MetricsConfig) Validate

func (c MetricsConfig) Validate() error

Validate checks if the configuration is valid

type MetricsForwarder

type MetricsForwarder struct {
	*BaseService
	// contains filtered or unexported fields
}

MetricsForwarder implements periodic metrics publishing to NATS

func (*MetricsForwarder) Start

func (mf *MetricsForwarder) Start(ctx context.Context) error

Start begins metrics forwarding

func (*MetricsForwarder) Stop

func (mf *MetricsForwarder) Stop(timeout time.Duration) error

Stop gracefully stops the MetricsForwarder

type MetricsForwarderConfig

type MetricsForwarderConfig struct {
	// Push interval for metrics publishing (e.g., "5s", "1m")
	PushInterval string `json:"push_interval"`

	// IncludeGoMetrics enables forwarding of go_* runtime metrics (goroutines, memory, GC)
	// Default: false (excluded to reduce noise)
	IncludeGoMetrics bool `json:"include_go_metrics"`

	// IncludeProcMetrics enables forwarding of process_* metrics (CPU, open FDs, memory)
	// Default: false (excluded to reduce noise)
	IncludeProcMetrics bool `json:"include_proc_metrics"`
}

MetricsForwarderConfig holds configuration for the MetricsForwarder service Note: The service is enabled/disabled via types.ServiceConfig.Enabled at the outer level. If the service is created, it will forward metrics.

func (MetricsForwarderConfig) Validate

func (c MetricsForwarderConfig) Validate() error

Validate checks if the configuration is valid

type MetricsGatherer

type MetricsGatherer interface {
	Gather() ([]*dto.MetricFamily, error)
}

MetricsGatherer defines the interface for gathering metrics. This allows for easier testing with mocks.

type MetricsPayload

type MetricsPayload struct {
	Component string        `json:"component"` // Component name
	Metrics   []MetricEntry `json:"metrics"`   // Array of metric values
}

MetricsPayload is the payload for type=component_metrics messages

type OpenAPIDocument

type OpenAPIDocument struct {
	OpenAPI    string              `json:"openapi"`
	Info       InfoSpec            `json:"info"`
	Servers    []ServerSpec        `json:"servers"`
	Paths      map[string]PathSpec `json:"paths"`
	Components *ComponentsSpec     `json:"components,omitempty"`
	Tags       []TagSpec           `json:"tags,omitempty"`
}

OpenAPIDocument represents the complete OpenAPI 3.0 specification

type OpenAPISpec

type OpenAPISpec struct {
	Paths            map[string]PathSpec `json:"paths"`
	Components       map[string]any      `json:"components,omitempty"`
	Tags             []TagSpec           `json:"tags,omitempty"`
	ResponseTypes    []reflect.Type      `json:"-"` // Response types to generate schemas for (not serialized)
	RequestBodyTypes []reflect.Type      `json:"-"` // Request body types to generate schemas for (not serialized)
}

OpenAPISpec represents a service's OpenAPI specification fragment

func GetOpenAPISpec

func GetOpenAPISpec(name string) (*OpenAPISpec, bool)

GetOpenAPISpec returns the OpenAPI specification for a specific service.

func NewOpenAPISpec

func NewOpenAPISpec() *OpenAPISpec

NewOpenAPISpec creates a new OpenAPI specification fragment for a service

func (*OpenAPISpec) AddPath

func (spec *OpenAPISpec) AddPath(path string, pathSpec PathSpec)

AddPath adds a path specification to the OpenAPI spec

func (*OpenAPISpec) AddTag

func (spec *OpenAPISpec) AddTag(name, description string)

AddTag adds a tag to the OpenAPI spec

func (*OpenAPISpec) MarshalJSON

func (spec *OpenAPISpec) MarshalJSON() ([]byte, error)

MarshalJSON implements json.Marshaler for OpenAPISpec

func (*OpenAPISpec) UnmarshalJSON

func (spec *OpenAPISpec) UnmarshalJSON(data []byte) error

UnmarshalJSON implements json.Unmarshaler for OpenAPISpec

type OperationSpec

type OperationSpec struct {
	Summary     string                  `json:"summary"`
	Description string                  `json:"description,omitempty"`
	Parameters  []ParameterSpec         `json:"parameters,omitempty"`
	RequestBody *RequestBodySpec        `json:"request_body,omitempty"`
	Responses   map[string]ResponseSpec `json:"responses"`
	Tags        []string                `json:"tags,omitempty"`
}

OperationSpec defines a single HTTP operation

type Option

type Option func(*BaseService)

Option is a functional option for configuring BaseService

func OnHealthChange

func OnHealthChange(fn func(bool)) Option

OnHealthChange sets a callback for health state changes

func WithHealthCheck

func WithHealthCheck(fn HealthCheckFunc) Option

WithHealthCheck sets a custom health check function

func WithHealthInterval

func WithHealthInterval(interval time.Duration) Option

WithHealthInterval sets the health check interval

func WithLogger

func WithLogger(logger *slog.Logger) Option

WithLogger sets a custom logger for the service

func WithMetrics

func WithMetrics(registry *metric.MetricsRegistry) Option

WithMetrics sets the metrics registry for the service

func WithNATS

func WithNATS(client *natsclient.Client) Option

WithNATS sets the NATS client for the service

type OverallHealth

type OverallHealth struct {
	Status        string `json:"status"` // "healthy", "degraded", "error"
	RunningCount  int    `json:"running_count"`
	DegradedCount int    `json:"degraded_count"`
	ErrorCount    int    `json:"error_count"`
}

OverallHealth provides aggregate health status and counts

type ParameterSpec

type ParameterSpec struct {
	Name        string `json:"name"`
	In          string `json:"in"` // "query", "path", "header"
	Description string `json:"description,omitempty"`
	Required    bool   `json:"required,omitempty"`
	Schema      Schema `json:"schema,omitempty"`
}

ParameterSpec defines an operation parameter

type PathSpec

type PathSpec struct {
	GET    *OperationSpec `json:"get,omitempty"`
	POST   *OperationSpec `json:"post,omitempty"`
	PUT    *OperationSpec `json:"put,omitempty"`
	PATCH  *OperationSpec `json:"patch,omitempty"`
	DELETE *OperationSpec `json:"delete,omitempty"`
}

PathSpec defines HTTP operations for a specific path

type PropertySchema

type PropertySchema struct {
	component.PropertySchema

	// Runtime indicates if this property can be changed without restart
	Runtime bool `json:"runtime,omitempty"`

	// Category groups related properties for UI organization
	Category string `json:"category,omitempty"`
}

PropertySchema extends component.PropertySchema with service-specific fields

type Registry

type Registry struct {
	// contains filtered or unexported fields
}

Registry manages service constructor registration

func NewServiceRegistry

func NewServiceRegistry() *Registry

NewServiceRegistry creates a new service registry

func (*Registry) Constructor

func (r *Registry) Constructor(name string) (Constructor, bool)

Constructor returns a constructor for the given service name

func (*Registry) Constructors

func (r *Registry) Constructors() map[string]Constructor

Constructors returns a copy of all constructors

func (*Registry) Register

func (r *Registry) Register(name string, constructor Constructor) error

Register registers a service constructor

func (*Registry) Services

func (r *Registry) Services() []string

Services returns all registered service names

type RequestBodySpec

type RequestBodySpec struct {
	Description string `json:"description,omitempty"`
	ContentType string `json:"content_type,omitempty"` // defaults to "application/json"
	SchemaRef   string `json:"schema_ref,omitempty"`   // e.g. "#/components/schemas/ReviewRequest"
	Required    bool   `json:"required,omitempty"`
}

RequestBodySpec defines an operation request body

type ResponseSpec

type ResponseSpec struct {
	Description string `json:"description"`
	ContentType string `json:"content_type,omitempty"`
	SchemaRef   string `json:"schema_ref,omitempty"` // $ref to schema, e.g., "#/components/schemas/RuntimeHealthResponse"
	IsArray     bool   `json:"is_array,omitempty"`   // If true, response is an array of SchemaRef items
}

ResponseSpec defines an operation response

type RuntimeConfigurable

type RuntimeConfigurable interface {
	Configurable

	// ValidateConfigUpdate checks if the proposed changes are valid
	ValidateConfigUpdate(changes map[string]any) error

	// ApplyConfigUpdate applies validated configuration changes
	ApplyConfigUpdate(changes map[string]any) error

	// GetRuntimeConfig returns current runtime configuration values
	GetRuntimeConfig() map[string]any
}

RuntimeConfigurable is an optional interface for services that support runtime configuration updates without restart.

type RuntimeHealthResponse

type RuntimeHealthResponse struct {
	Timestamp  time.Time         `json:"timestamp"`
	Overall    OverallHealth     `json:"overall"`
	Components []ComponentHealth `json:"components"`
}

RuntimeHealthResponse represents the JSON response for runtime health

type RuntimeMessage

type RuntimeMessage struct {
	Timestamp   string         `json:"timestamp"`
	Subject     string         `json:"subject"`
	MessageID   string         `json:"message_id"`
	Component   string         `json:"component"`
	Direction   string         `json:"direction"`
	Summary     string         `json:"summary"`
	Metadata    map[string]any `json:"metadata,omitempty"`
	MessageType string         `json:"message_type,omitempty"`
}

RuntimeMessage represents a formatted message entry for UI consumption

type RuntimeMessagesResponse

type RuntimeMessagesResponse struct {
	Timestamp string           `json:"timestamp"`
	Messages  []RuntimeMessage `json:"messages"`
	Total     int              `json:"total"`
	Limit     int              `json:"limit"`
	Note      string           `json:"note,omitempty"`
}

RuntimeMessagesResponse represents the response structure for runtime messages

type RuntimeMetricsResponse

type RuntimeMetricsResponse struct {
	Timestamp           time.Time         `json:"timestamp"`
	PrometheusAvailable bool              `json:"prometheus_available"`
	Components          []ComponentMetric `json:"components"`
}

RuntimeMetricsResponse represents the JSON response for runtime metrics

type Schema

type Schema struct {
	Type   string `json:"type"`
	Format string `json:"format,omitempty"`
}

Schema defines parameter or response schema

type ServerSpec

type ServerSpec struct {
	URL         string `json:"url"`
	Description string `json:"description"`
}

ServerSpec defines an API server

type Service

type Service interface {
	Name() string
	Start(ctx context.Context) error
	Stop(timeout time.Duration) error
	Status() Status
	IsHealthy() bool       // Keep for compatibility during migration
	GetStatus() Info       // Keep for compatibility during migration
	Health() health.Status // NEW: Standard health reporting
	RegisterMetrics(registrar metric.MetricsRegistrar) error
}

Service interface defines the contract for all services

func NewComponentManager

func NewComponentManager(rawConfig json.RawMessage, deps *Dependencies) (Service, error)

NewComponentManager creates a new ComponentManager using the standard constructor pattern

func NewFlowServiceFromConfig

func NewFlowServiceFromConfig(rawConfig json.RawMessage, deps *Dependencies) (Service, error)

NewFlowServiceFromConfig creates a new flow service

func NewHeartbeatService

func NewHeartbeatService(rawConfig json.RawMessage, deps *Dependencies) (Service, error)

NewHeartbeatService creates a new heartbeat service using the standard constructor pattern

func NewLogForwarderService

func NewLogForwarderService(rawConfig json.RawMessage, deps *Dependencies) (Service, error)

NewLogForwarderService creates a new log forwarder service using the standard constructor pattern. With the new architecture, LogForwarder no longer intercepts slog - logs are published to NATS directly by the NATSLogHandler in pkg/logging. This service exists for configuration management and potential future features (e.g., log aggregation, filtering at the service level).

func NewMessageLoggerService

func NewMessageLoggerService(rawConfig json.RawMessage, deps *Dependencies) (Service, error)

NewMessageLoggerService creates a new message logger service using the standard constructor pattern

func NewMetrics

func NewMetrics(rawConfig json.RawMessage, deps *Dependencies) (Service, error)

NewMetrics creates a new metrics service using the standard constructor pattern

func NewMetricsForwarderService

func NewMetricsForwarderService(rawConfig json.RawMessage, deps *Dependencies) (Service, error)

NewMetricsForwarderService creates a new metrics forwarder service using the standard constructor pattern

type Status

type Status int

Status represents the current status of a service

const (
	StatusStopped Status = iota
	StatusStarting
	StatusRunning
	StatusStopping
)

Possible service statuses

func (Status) String

func (s Status) String() string

String returns the string representation of Status

type StatusStreamEnvelope

type StatusStreamEnvelope struct {
	Type      string          `json:"type"`
	ID        string          `json:"id"`
	Timestamp int64           `json:"timestamp"`
	FlowID    string          `json:"flow_id"`
	Payload   json.RawMessage `json:"payload,omitempty"`
}

StatusStreamEnvelope wraps all WebSocket status stream messages

type SubscribeAck

type SubscribeAck struct {
	Type       string   `json:"type"`                // Always "subscribe_ack"
	Subscribed []string `json:"subscribed"`          // Message types now subscribed
	LogLevel   string   `json:"log_level,omitempty"` // Current log level filter (empty = all)
	Sources    []string `json:"sources,omitempty"`   // Current source filters (empty = all)
}

SubscribeAck is the acknowledgment response sent after processing a subscribe command (Server → Client)

type SubscribeCommand

type SubscribeCommand struct {
	Command      string   `json:"command"`                 // Must be "subscribe"
	MessageTypes []string `json:"message_types,omitempty"` // Filter: flow_status, component_health, component_metrics, log_entry
	LogLevel     string   `json:"log_level,omitempty"`     // Minimum log level: DEBUG, INFO, WARN, ERROR
	Sources      []string `json:"sources,omitempty"`       // Filter by source component names
}

SubscribeCommand represents a client subscription command (Client → Server)

type TagSpec

type TagSpec struct {
	Name        string `json:"name"`
	Description string `json:"description"`
}

TagSpec defines an API tag for grouping operations

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL