Documentation
¶
Overview ¶
Package service provides base functionality and common patterns for long-running services in the semstreams platform. It includes health monitoring, lifecycle management, and metric collection capabilities.
Package service provides service management and HTTP APIs for the SemStreams platform.
Package service provides service lifecycle management, HTTP server coordination, and component orchestration for the StreamKit platform.
The service package implements a sophisticated service architecture with clearly separated responsibilities across multiple service types:
Core Service Types ¶
BaseService: Foundation for all services with standardized lifecycle management:
- Lifecycle states: Stopped → Starting → Running → Stopping
- Health monitoring with periodic checks
- Metrics integration with CoreMetrics registry
- Context-based cancellation and graceful shutdown
- Dependency injection through Dependencies
Manager: Central orchestration of HTTP server and service lifecycle:
- HTTP server management with graceful shutdown
- Service registration and dependency injection
- Two-phase HTTP initialization (system endpoints → service endpoints)
- Health aggregation across all services
- OpenAPI documentation aggregation
ComponentManager: Dynamic component lifecycle management:
- Component instantiation from registry factories
- Flow-based component deployment
- Runtime configuration updates via NATS KV
- Flow graph validation with connectivity analysis
- Health monitoring of managed components
FlowService: Visual flow builder HTTP API:
- CRUD operations for flow definitions
- Flow deployment via Engine integration
- Real-time flow status monitoring
- Validation feedback with detailed errors
Service Patterns ¶
All services follow standardized patterns:
Constructor Pattern with Dependency Injection:
type MyService struct {
*BaseService
// service-specific fields
}
func NewMyService(deps Dependencies, config MyConfig) (*MyService, error) {
base := NewBaseService("my-service", deps)
svc := &MyService{BaseService: base}
// Initialize service-specific fields
return svc, nil
}
Lifecycle Implementation:
func (s *MyService) Initialize(ctx context.Context) error {
// One-time initialization
return s.BaseService.Initialize(ctx)
}
func (s *MyService) Start(ctx context.Context) error {
// Start background operations
return s.BaseService.Start(ctx)
}
func (s *MyService) Stop(ctx context.Context) error {
// Graceful shutdown
return s.BaseService.Stop(ctx)
}
HTTP Handler Integration:
func (s *MyService) RegisterHTTPHandlers(mux *http.ServeMux) {
mux.HandleFunc("/api/v1/myservice/", s.handleRequest)
}
func (s *MyService) OpenAPISpec() map[string]any {
return map[string]any{
"paths": map[string]any{
"/api/v1/myservice/": {
"get": map[string]any{
"summary": "My service endpoint",
"responses": map[string]any{
"200": map[string]any{
"description": "Success",
},
},
},
},
},
}
}
Service Registration ¶
Services are registered with Manager using constructor functions:
manager := service.NewServiceManager(deps)
// Register services
manager.RegisterConstructor("my-service", func(deps Dependencies) (Service, error) {
return NewMyService(deps, config)
})
// Initialize and start all services
if err := manager.InitializeAll(ctx); err != nil {
log.Fatal(err)
}
if err := manager.StartAll(ctx); err != nil {
log.Fatal(err)
}
HTTP Server Management ¶
Manager coordinates HTTP server lifecycle with two-phase initialization:
Early Phase (initializeHTTPInfrastructure): - System endpoints registered: /health, /readyz, /metrics - HTTP server created but not started
Late Phase (completeHTTPSetup): - Service endpoints registered after services start - OpenAPI documentation aggregated - HTTP server starts listening
This prevents race conditions and ensures system endpoints are available before service-specific endpoints.
Health Monitoring ¶
Services implement health checks through BaseService:
// Override health check logic
func (s *MyService) healthCheck() error {
if !s.isHealthy {
return fmt.Errorf("service unhealthy: %v", s.lastError)
}
return nil
}
Health status is aggregated by Manager:
- /health - Returns 200 if any service is healthy
- /readyz - Returns 200 if all services are healthy
Metrics Integration ¶
Services automatically register metrics with CoreMetrics:
- semstreams_service_status - Current service status (gauge)
- semstreams_messages_received_total - Message counter
- semstreams_messages_processed_total - Processing counter
- semstreams_health_checks_total - Health check counter
Component Management ¶
ComponentManager integrates with the component registry and flow engine:
cm := service.NewComponentManager(deps, flowEngine, configMgr) // Deploy flow with components err := cm.DeployFlow(ctx, flowID, flowDef) // Runtime configuration updates err := cm.UpdateComponentConfig(ctx, componentName, newConfig) // Health monitoring status := cm.GetComponentStatus(componentName)
Error Handling ¶
Services follow StreamKit error handling patterns:
- Configuration errors: Return during construction
- Initialization errors: Return from Initialize()
- Runtime errors: Log and update health status
- Shutdown errors: Log but continue graceful shutdown
Use project error wrapping for context:
import "github.com/c360studio/semstreams/pkg/errs"
if err := validateConfig(cfg); err != nil {
return errs.WrapInvalid(err, "my-service", "NewMyService", "validate config")
}
Graceful Shutdown ¶
Manager coordinates graceful shutdown in reverse order:
- Stop accepting new HTTP requests
- Stop services in reverse registration order
- Shutdown HTTP server with timeout
- Close remaining connections
Example:
// Main application
ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second)
defer cancel()
if err := manager.StopAll(ctx); err != nil {
log.Printf("Graceful shutdown incomplete: %v", err)
}
Testing ¶
The package provides ServiceSuite for integration testing with testcontainers:
func TestMyService(t *testing.T) {
suite := service.NewServiceSuite(t)
defer suite.Cleanup()
// Suite provides NATS client, config manager, etc.
svc, err := NewMyService(suite.Deps(), config)
require.NoError(t, err)
// Test service lifecycle
err = svc.Initialize(suite.Context())
require.NoError(t, err)
}
Security Considerations ¶
The service HTTP APIs are designed for internal edge deployment:
- No built-in authentication (add reverse proxy for production)
- No rate limiting (implement at gateway level)
- Path traversal protection on component endpoints
- Input validation on all HTTP handlers
For production deployments, add external security layers:
- Reverse proxy with authentication (nginx, Traefik)
- Network policies to restrict access
- TLS termination at gateway
- Rate limiting at gateway level
Example: Complete Service Implementation ¶
package main
import (
"context"
"log"
"os"
"os/signal"
"syscall"
"github.com/c360studio/semstreams/service"
"github.com/c360studio/semstreams/config"
"github.com/c360studio/semstreams/natsclient"
"github.com/c360studio/semstreams/metric"
)
func main() {
// Load configuration
cfg, err := config.LoadMinimalConfig("config.json")
if err != nil {
log.Fatal(err)
}
// Initialize dependencies
natsClient, err := natsclient.NewClient(cfg.NATS)
if err != nil {
log.Fatal(err)
}
defer natsClient.Close()
metricsRegistry := metric.NewMetricsRegistry()
configMgr := config.NewConfigManager(natsClient, cfg)
deps := service.Dependencies{
NATSClient: natsClient,
Manager: configMgr,
MetricsRegistry: metricsRegistry,
Logger: slog.Default(),
Platform: cfg.Platform,
}
// Create service manager
manager := service.NewServiceManager(deps)
// Register services
manager.RegisterConstructor("flow-service", func(d Dependencies) (Service, error) {
return service.NewFlowService(d, flowEngine, flowStore)
})
// Initialize and start
ctx := context.Background()
if err := manager.InitializeAll(ctx); err != nil {
log.Fatal(err)
}
if err := manager.StartAll(ctx); err != nil {
log.Fatal(err)
}
// Wait for signal
sig := make(chan os.Signal, 1)
signal.Notify(sig, syscall.SIGINT, syscall.SIGTERM)
<-sig
// Graceful shutdown
shutdownCtx, cancel := context.WithTimeout(context.Background(), 30*time.Second)
defer cancel()
if err := manager.StopAll(shutdownCtx); err != nil {
log.Printf("Shutdown error: %v", err)
}
}
For more details and examples, see the README.md in this directory.
Package service provides the Heartbeat service for emitting periodic system health logs.
Package service provides the LogForwarder service for log management.
Package service provides the MessageLogger service for observing message flow ¶
Package service provides the MetricsForwarder service for forwarding metrics to NATS ¶
Package service provides OpenAPI specification types for HTTP endpoint documentation ¶
Package service provides the OpenAPI registry for service specifications ¶
Package service provides OpenAPI specification types for HTTP endpoint documentation ¶
Package service provides service registration ¶
Package service provides service registration and management
Index ¶
- func GetAllOpenAPISpecs() map[string]*OpenAPISpec
- func RegisterAll(registry *Registry) error
- func RegisterOpenAPISpec(name string, spec *OpenAPISpec)
- func SchemaFromType(t reflect.Type) map[string]any
- func TypeNameFromReflect(t reflect.Type) string
- type BaseService
- func (s *BaseService) GetStatus() Info
- func (s *BaseService) Health() health.Status
- func (s *BaseService) IsHealthy() bool
- func (s *BaseService) Name() string
- func (s *BaseService) OnHealthChange(callback func(bool))
- func (s *BaseService) RegisterMetrics(_ metric.MetricsRegistrar) error
- func (s *BaseService) SetHealthCheck(fn HealthCheckFunc)
- func (s *BaseService) Start(ctx context.Context) error
- func (s *BaseService) Status() Status
- func (s *BaseService) Stop(timeout time.Duration) error
- type ClientState
- func (cs *ClientState) GetLogLevel() string
- func (cs *ClientState) GetSources() []string
- func (cs *ClientState) GetSubscribedTypes() []string
- func (cs *ClientState) IsSubscribed(messageType string) bool
- func (cs *ClientState) ShouldReceiveLogLevel(level string) bool
- func (cs *ClientState) ShouldReceiveSource(source string) bool
- func (cs *ClientState) UpdateSubscription(messageTypes []string, logLevel string, sources []string)
- type ComponentGap
- type ComponentHealth
- type ComponentManager
- func (cm *ComponentManager) Component(name string) component.Discoverable
- func (cm *ComponentManager) CreateComponent(ctx context.Context, instanceName string, cfg types.ComponentConfig, ...) error
- func (cm *ComponentManager) CreateComponentsFromConfig(ctx context.Context, cfg *config.Config) error
- func (cm *ComponentManager) DetectObjectStoreGaps() []ComponentGap
- func (cm *ComponentManager) GetComponentHealth() map[string]component.HealthStatus
- func (cm *ComponentManager) GetComponentStatus() map[string]ComponentStatus
- func (cm *ComponentManager) GetFlowGraph() *flowgraph.FlowGraph
- func (cm *ComponentManager) GetFlowPaths() map[string][]string
- func (cm *ComponentManager) GetHealthyComponents() []string
- func (cm *ComponentManager) GetManagedComponents() map[string]*component.ManagedComponent
- func (cm *ComponentManager) GetRegistry() *component.Registry
- func (cm *ComponentManager) GetUnhealthyComponents() []string
- func (cm *ComponentManager) Initialize() error
- func (cm *ComponentManager) IsInitialized() bool
- func (cm *ComponentManager) IsStarted() bool
- func (cm *ComponentManager) ListComponents() map[string]component.Discoverable
- func (cm *ComponentManager) OpenAPISpec() *OpenAPISpec
- func (cm *ComponentManager) RegisterComponentErrorHook(hook func(ctx context.Context, name string, err error))
- func (cm *ComponentManager) RegisterComponentStartHook(hook func(ctx context.Context, name string, comp component.Discoverable))
- func (cm *ComponentManager) RegisterComponentStopHook(hook func(ctx context.Context, name string, reason string))
- func (cm *ComponentManager) RegisterHTTPHandlers(prefix string, mux *http.ServeMux)
- func (cm *ComponentManager) RegisterHealthChangeHook(hook func(ctx context.Context, name string, healthy bool, details string))
- func (cm *ComponentManager) RemoveComponent(instanceName string) error
- func (cm *ComponentManager) Start(ctx context.Context) error
- func (cm *ComponentManager) Stop(timeout time.Duration) error
- func (cm *ComponentManager) ValidateFlowConnectivity() *flowgraph.FlowAnalysisResult
- type ComponentManagerConfig
- type ComponentMetric
- type ComponentPortDetail
- type ComponentPortInfo
- type ComponentPortReference
- type ComponentStatus
- type ComponentsSpec
- type ConfigSchema
- type Configurable
- type Constructor
- type Dependencies
- type ErrorResponse
- type FlowConnection
- type FlowGap
- type FlowService
- type FlowServiceConfig
- type FlowStatusPayload
- type HTTPHandler
- type HealthCheckFunc
- type HeartbeatConfig
- type HeartbeatService
- type Info
- type InfoSpec
- type KVTestHelper
- func (h *KVTestHelper) AssertValidKVKey(key string)
- func (h *KVTestHelper) GetServiceConfig(service string) (map[string]any, uint64, error)
- func (h *KVTestHelper) SimulateConcurrentUpdate(service string) error
- func (h *KVTestHelper) UpdateServiceConfig(service string, updateFn func(config map[string]any) error) error
- func (h *KVTestHelper) WaitForConfigPropagation(_ time.Duration) bool
- func (h *KVTestHelper) WriteComponentConfig(componentType, name string, config map[string]any) uint64
- func (h *KVTestHelper) WriteServiceConfig(service string, config map[string]any) uint64
- type KVWatchConnectedEvent
- type KVWatchEvent
- type LogEntryPayload
- type LogForwarder
- type LogForwarderConfig
- type Manager
- func (m *Manager) ConfigureFromServices(services map[string]types.ServiceConfig, deps *Dependencies) error
- func (m *Manager) CreateService(name string, rawConfig json.RawMessage, deps *Dependencies) (Service, error)
- func (m *Manager) GetAllServiceStatus() map[string]any
- func (m *Manager) GetAllServices() map[string]Service
- func (m *Manager) GetHealthyServices() []string
- func (m *Manager) GetService(name string) (Service, bool)
- func (m *Manager) GetServiceRuntimeConfig(serviceName string) (map[string]any, error)
- func (m *Manager) GetServiceStatus(name string) (any, error)
- func (m *Manager) GetUnhealthyServices() []string
- func (m *Manager) HasConstructor(name string) bool
- func (m *Manager) ListConstructors() []string
- func (m *Manager) RemoveService(name string)
- func (m *Manager) Start(ctx context.Context) error
- func (m *Manager) StartAll(ctx context.Context) error
- func (m *Manager) StartService(ctx context.Context, name string, rawConfig json.RawMessage, ...) error
- func (m *Manager) Stop(timeout time.Duration) error
- func (m *Manager) StopAll(timeout time.Duration) error
- func (m *Manager) StopService(name string, timeout time.Duration) error
- type ManagerConfig
- type MessageLogEntry
- type MessageLogger
- func (ml *MessageLogger) ApplyConfigUpdate(changes map[string]any) error
- func (ml *MessageLogger) ConfigSchema() ConfigSchema
- func (ml *MessageLogger) GetEntriesByTrace(traceID string) []MessageLogEntry
- func (ml *MessageLogger) GetLogEntries(limit int) []MessageLogEntry
- func (ml *MessageLogger) GetMessages() []MessageLogEntry
- func (ml *MessageLogger) GetRuntimeConfig() map[string]any
- func (ml *MessageLogger) GetStatistics() map[string]any
- func (ml *MessageLogger) OpenAPISpec() *OpenAPISpec
- func (ml *MessageLogger) RegisterHTTPHandlers(prefix string, mux *http.ServeMux)
- func (ml *MessageLogger) Start(ctx context.Context) error
- func (ml *MessageLogger) Stop(timeout time.Duration) error
- func (ml *MessageLogger) ValidateConfigUpdate(changes map[string]any) error
- type MessageLoggerConfig
- type MetricEntry
- type Metrics
- func (m *Metrics) ApplyConfigUpdate(changes map[string]any) error
- func (m *Metrics) ConfigSchema() ConfigSchema
- func (m *Metrics) GetRuntimeConfig() map[string]any
- func (m *Metrics) Path() string
- func (m *Metrics) Port() int
- func (m *Metrics) Start(ctx context.Context) error
- func (m *Metrics) Stop(timeout time.Duration) error
- func (m *Metrics) URL() string
- func (m *Metrics) ValidateConfigUpdate(changes map[string]any) error
- type MetricsConfig
- type MetricsForwarder
- type MetricsForwarderConfig
- type MetricsGatherer
- type MetricsPayload
- type OpenAPIDocument
- type OpenAPISpec
- type OperationSpec
- type Option
- type OverallHealth
- type ParameterSpec
- type PathSpec
- type PropertySchema
- type Registry
- type RequestBodySpec
- type ResponseSpec
- type RuntimeConfigurable
- type RuntimeHealthResponse
- type RuntimeMessage
- type RuntimeMessagesResponse
- type RuntimeMetricsResponse
- type Schema
- type ServerSpec
- type Service
- func NewComponentManager(rawConfig json.RawMessage, deps *Dependencies) (Service, error)
- func NewFlowServiceFromConfig(rawConfig json.RawMessage, deps *Dependencies) (Service, error)
- func NewHeartbeatService(rawConfig json.RawMessage, deps *Dependencies) (Service, error)
- func NewLogForwarderService(rawConfig json.RawMessage, deps *Dependencies) (Service, error)
- func NewMessageLoggerService(rawConfig json.RawMessage, deps *Dependencies) (Service, error)
- func NewMetrics(rawConfig json.RawMessage, deps *Dependencies) (Service, error)
- func NewMetricsForwarderService(rawConfig json.RawMessage, deps *Dependencies) (Service, error)
- type Status
- type StatusStreamEnvelope
- type SubscribeAck
- type SubscribeCommand
- type TagSpec
Constants ¶
This section is empty.
Variables ¶
This section is empty.
Functions ¶
func GetAllOpenAPISpecs ¶
func GetAllOpenAPISpecs() map[string]*OpenAPISpec
GetAllOpenAPISpecs returns all registered OpenAPI specifications. Used by the openapi-generator tool to collect specs from all services.
func RegisterAll ¶
RegisterAll registers all built-in services with the registry Future: Can be split into Registercore (), RegisterMonitoring(), etc.
func RegisterOpenAPISpec ¶
func RegisterOpenAPISpec(name string, spec *OpenAPISpec)
RegisterOpenAPISpec registers an OpenAPI specification for a service. This should be called from init() functions in service files.
func SchemaFromType ¶
SchemaFromType generates a JSON Schema from a reflect.Type. It handles primitives, structs, slices, maps, pointers, and time.Time.
func TypeNameFromReflect ¶
TypeNameFromReflect extracts a clean type name from a reflect.Type. For example: "service.RuntimeHealthResponse" -> "RuntimeHealthResponse"
Types ¶
type BaseService ¶
type BaseService struct {
// contains filtered or unexported fields
}
BaseService provides common functionality for all services
func NewBaseServiceWithOptions ¶
func NewBaseServiceWithOptions(name string, cfg *config.Config, opts ...Option) *BaseService
NewBaseServiceWithOptions creates a new base service using functional options pattern
func (*BaseService) GetStatus ¶
func (s *BaseService) GetStatus() Info
GetStatus returns the current service information
func (*BaseService) Health ¶
func (s *BaseService) Health() health.Status
Health returns the standard health status for the service
func (*BaseService) IsHealthy ¶
func (s *BaseService) IsHealthy() bool
IsHealthy returns whether the service is healthy
func (*BaseService) OnHealthChange ¶
func (s *BaseService) OnHealthChange(callback func(bool))
OnHealthChange sets a callback for health state changes
func (*BaseService) RegisterMetrics ¶
func (s *BaseService) RegisterMetrics(_ metric.MetricsRegistrar) error
RegisterMetrics allows services to register their own domain-specific metrics
func (*BaseService) SetHealthCheck ¶
func (s *BaseService) SetHealthCheck(fn HealthCheckFunc)
SetHealthCheck sets a custom health check function
func (*BaseService) Start ¶
func (s *BaseService) Start(ctx context.Context) error
Start starts the service
func (*BaseService) Status ¶
func (s *BaseService) Status() Status
Status returns the current service status
type ClientState ¶
type ClientState struct {
// contains filtered or unexported fields
}
ClientState represents client subscription state
func (*ClientState) GetLogLevel ¶
func (cs *ClientState) GetLogLevel() string
GetLogLevel returns the current log level filter
func (*ClientState) GetSources ¶
func (cs *ClientState) GetSources() []string
GetSources returns the current source filters
func (*ClientState) GetSubscribedTypes ¶
func (cs *ClientState) GetSubscribedTypes() []string
GetSubscribedTypes returns the list of currently subscribed message types
func (*ClientState) IsSubscribed ¶
func (cs *ClientState) IsSubscribed(messageType string) bool
IsSubscribed checks if the client is subscribed to a message type
func (*ClientState) ShouldReceiveLogLevel ¶
func (cs *ClientState) ShouldReceiveLogLevel(level string) bool
ShouldReceiveLogLevel checks if the client should receive logs at the given level Log level hierarchy: DEBUG=0 < INFO=1 < WARN=2 < ERROR=3
func (*ClientState) ShouldReceiveSource ¶
func (cs *ClientState) ShouldReceiveSource(source string) bool
ShouldReceiveSource checks if the client should receive logs from the given source
func (*ClientState) UpdateSubscription ¶
func (cs *ClientState) UpdateSubscription(messageTypes []string, logLevel string, sources []string)
UpdateSubscription updates the client's subscription filters Empty arrays are treated as "keep current subscriptions" rather than "unsubscribe all"
type ComponentGap ¶
type ComponentGap struct {
ComponentName string `json:"component_name"`
Issue string `json:"issue"`
Description string `json:"description"`
Suggestions []string `json:"suggestions,omitempty"`
}
ComponentGap represents a connectivity gap in the component flow
type ComponentHealth ¶
type ComponentHealth struct {
Name string `json:"name"`
Component string `json:"component"` // Component factory name (e.g., "udp", "graph-processor")
Type types.ComponentType `json:"type"` // Component category (input/processor/output/storage/gateway)
Status string `json:"status"` // "running", "degraded", "error", "stopped"
Healthy bool `json:"healthy"`
Message string `json:"message"`
StartTime *time.Time `json:"start_time"` // ISO 8601 timestamp, null if not started
LastActivity *time.Time `json:"last_activity"` // ISO 8601 timestamp, null if no activity
UptimeSeconds *float64 `json:"uptime_seconds"` // null if not started
Details any `json:"details"` // Additional details for degraded/error states
}
ComponentHealth represents health and timing for a single component
type ComponentManager ¶
type ComponentManager struct {
*BaseService
// contains filtered or unexported fields
}
ComponentManager handles lifecycle management of all components (inputs, processors, outputs) through the unified component system.
ComponentManager follows lifecycle:
Initialize() - Create components but don't start them Start(ctx) - Start initialized components with context Stop() - Stop components in reverse order
func (*ComponentManager) Component ¶
func (cm *ComponentManager) Component(name string) component.Discoverable
Component retrieves a specific component instance by name
func (*ComponentManager) CreateComponent ¶
func (cm *ComponentManager) CreateComponent( ctx context.Context, instanceName string, cfg types.ComponentConfig, deps component.Dependencies, ) error
CreateComponent creates a new component instance and registers it This is for runtime component creation, not part of the normal Initialize/Start flow
func (*ComponentManager) CreateComponentsFromConfig ¶
func (cm *ComponentManager) CreateComponentsFromConfig(ctx context.Context, cfg *config.Config) error
CreateComponentsFromConfig creates and initializes components based on configuration
func (*ComponentManager) DetectObjectStoreGaps ¶
func (cm *ComponentManager) DetectObjectStoreGaps() []ComponentGap
DetectObjectStoreGaps identifies disconnected storage components
func (*ComponentManager) GetComponentHealth ¶
func (cm *ComponentManager) GetComponentHealth() map[string]component.HealthStatus
GetComponentHealth returns current health status for all managed components Direct component health queries using the component.Health() interface
func (*ComponentManager) GetComponentStatus ¶
func (cm *ComponentManager) GetComponentStatus() map[string]ComponentStatus
GetComponentStatus returns combined lifecycle state and health status for all components
func (*ComponentManager) GetFlowGraph ¶
func (cm *ComponentManager) GetFlowGraph() *flowgraph.FlowGraph
GetFlowGraph returns the current FlowGraph, using cache if valid
func (*ComponentManager) GetFlowPaths ¶
func (cm *ComponentManager) GetFlowPaths() map[string][]string
GetFlowPaths returns data paths from input components to all reachable components
func (*ComponentManager) GetHealthyComponents ¶
func (cm *ComponentManager) GetHealthyComponents() []string
GetHealthyComponents returns names of components that report healthy status
func (*ComponentManager) GetManagedComponents ¶
func (cm *ComponentManager) GetManagedComponents() map[string]*component.ManagedComponent
GetManagedComponents returns a copy of all managed components with their state
func (*ComponentManager) GetRegistry ¶
func (cm *ComponentManager) GetRegistry() *component.Registry
GetRegistry returns the component registry for schema introspection This is used by the schema API to access component schemas
func (*ComponentManager) GetUnhealthyComponents ¶
func (cm *ComponentManager) GetUnhealthyComponents() []string
GetUnhealthyComponents returns names of components that report unhealthy status
func (*ComponentManager) Initialize ¶
func (cm *ComponentManager) Initialize() error
Initialize creates all configured components but does not start them This follows the unified Pattern A lifecycle where creation is separate from starting
func (*ComponentManager) IsInitialized ¶
func (cm *ComponentManager) IsInitialized() bool
IsInitialized returns true if the component manager is initialized
func (*ComponentManager) IsStarted ¶
func (cm *ComponentManager) IsStarted() bool
IsStarted returns true if the component manager is started
func (*ComponentManager) ListComponents ¶
func (cm *ComponentManager) ListComponents() map[string]component.Discoverable
ListComponents returns all registered component instances
func (*ComponentManager) OpenAPISpec ¶
func (cm *ComponentManager) OpenAPISpec() *OpenAPISpec
OpenAPISpec returns the OpenAPI specification for ComponentManager endpoints
func (*ComponentManager) RegisterComponentErrorHook ¶
func (cm *ComponentManager) RegisterComponentErrorHook(hook func(ctx context.Context, name string, err error))
RegisterComponentErrorHook registers a callback for component error events
func (*ComponentManager) RegisterComponentStartHook ¶
func (cm *ComponentManager) RegisterComponentStartHook( hook func(ctx context.Context, name string, comp component.Discoverable), )
RegisterComponentStartHook registers a callback for component start events
func (*ComponentManager) RegisterComponentStopHook ¶
func (cm *ComponentManager) RegisterComponentStopHook(hook func(ctx context.Context, name string, reason string))
RegisterComponentStopHook registers a callback for component stop events
func (*ComponentManager) RegisterHTTPHandlers ¶
func (cm *ComponentManager) RegisterHTTPHandlers(prefix string, mux *http.ServeMux)
RegisterHTTPHandlers registers HTTP endpoints for the ComponentManager service
func (*ComponentManager) RegisterHealthChangeHook ¶
func (cm *ComponentManager) RegisterHealthChangeHook( hook func(ctx context.Context, name string, healthy bool, details string), )
RegisterHealthChangeHook registers a callback for health change events
func (*ComponentManager) RemoveComponent ¶
func (cm *ComponentManager) RemoveComponent(instanceName string) error
RemoveComponent stops and removes a component instance
func (*ComponentManager) Start ¶
func (cm *ComponentManager) Start(ctx context.Context) error
Start starts all initialized components with proper context flow-through
func (*ComponentManager) Stop ¶
func (cm *ComponentManager) Stop(timeout time.Duration) error
Stop gracefully stops all components in reverse order of startup
func (*ComponentManager) ValidateFlowConnectivity ¶
func (cm *ComponentManager) ValidateFlowConnectivity() *flowgraph.FlowAnalysisResult
ValidateFlowConnectivity performs FlowGraph connectivity analysis with caching
type ComponentManagerConfig ¶
type ComponentManagerConfig struct {
// WatchConfig enables dynamic configuration updates via NATS KV bucket.
// When true, the manager watches for changes to component configurations
// and applies them at runtime without service restart.
WatchConfig bool `json:"watch_config" schema:"type:boolean,description:Enable config watching via NATS KV,default:false,category:basic"`
// EnabledComponents lists component names to enable.
// If empty, all registered components are enabled.
// Use this to selectively enable specific components in a deployment.
EnabledComponents []string `json:"enabled_components" schema:"type:array,description:List of component names to enable (empty=all),category:basic"`
}
ComponentManagerConfig configures the ComponentManager service.
The ComponentManager orchestrates component lifecycle (create, start, stop) and optionally watches for configuration changes via NATS KV.
func DefaultComponentManagerConfig ¶
func DefaultComponentManagerConfig() ComponentManagerConfig
DefaultComponentManagerConfig returns the default configuration.
func (ComponentManagerConfig) Validate ¶
func (c ComponentManagerConfig) Validate() error
Validate checks if the configuration is valid
type ComponentMetric ¶
type ComponentMetric struct {
Name string `json:"name"`
Component string `json:"component"` // Component factory name (e.g., "udp", "graph-processor")
Type types.ComponentType `json:"type"` // Component category (input/processor/output/storage/gateway)
Status string `json:"status"`
Throughput *float64 `json:"throughput"` // msgs/sec, null if unavailable
ErrorRate *float64 `json:"error_rate"` // errors/sec, null if unavailable
QueueDepth *float64 `json:"queue_depth"` // current queue depth, null if unavailable
RawCounters *map[string]uint64 `json:"raw_counters"` // only present when Prometheus unavailable
}
ComponentMetric represents metrics for a single component
type ComponentPortDetail ¶
type ComponentPortDetail struct {
Name string `json:"name"`
Direction component.Direction `json:"direction"`
Subject string `json:"subject"`
PortType string `json:"port_type"`
}
ComponentPortDetail represents detailed information about a single port
type ComponentPortInfo ¶
type ComponentPortInfo struct {
ComponentName string `json:"component_name"`
InputPorts []ComponentPortDetail `json:"input_ports"`
OutputPorts []ComponentPortDetail `json:"output_ports"`
}
ComponentPortInfo represents port information extracted from a component
type ComponentPortReference ¶
type ComponentPortReference struct {
ComponentName string `json:"component_name"`
PortName string `json:"port_name"`
}
ComponentPortReference references a specific port on a component
type ComponentStatus ¶
type ComponentStatus struct {
Name string `json:"name"`
State component.State `json:"state"`
Health component.HealthStatus `json:"health"`
DataFlow component.FlowMetrics `json:"data_flow"`
LastError error `json:"last_error,omitempty"`
}
ComponentStatus combines lifecycle state with health and flow metrics
type ComponentsSpec ¶
ComponentsSpec holds reusable OpenAPI component definitions
type ConfigSchema ¶
type ConfigSchema struct {
component.ConfigSchema
// ServiceSpecific can hold any service-specific schema extensions
ServiceSpecific map[string]any `json:"service_specific,omitempty"`
}
ConfigSchema describes the configuration parameters for a service. We embed the component ConfigSchema for consistency across the system.
func NewConfigSchema ¶
func NewConfigSchema(properties map[string]PropertySchema, required []string) ConfigSchema
NewConfigSchema creates a service ConfigSchema with extended property schemas
type Configurable ¶
type Configurable interface {
// ConfigSchema returns the configuration schema for this service
ConfigSchema() ConfigSchema
}
Configurable is an optional interface for services that expose their configuration schema. This enables UI discovery and validation of service configurations. Services implementing this interface can describe their configuration parameters, including which fields can be changed at runtime without restart.
NOTE: This interface is reserved for future UI features. Currently only the Metrics service implements it. The Discovery service exists to expose this information via HTTP API.
type Constructor ¶
type Constructor func(rawConfig json.RawMessage, deps *Dependencies) (Service, error)
Constructor defines the standard constructor signature for all services. Every service must have a constructor that follows this pattern. The constructor receives raw JSON config and must handle its own parsing.
type Dependencies ¶
type Dependencies struct {
NATSClient *natsclient.Client
MetricsRegistry *metric.MetricsRegistry
Logger *slog.Logger
Platform types.PlatformMeta // Platform identity
Manager *config.Manager // Centralized configuration management
ComponentRegistry *component.Registry // Component registry for ComponentManager
ServiceManager *Manager // Service manager for accessing other services
}
Dependencies provides the standard dependencies that all services receive. This replaces the old Dependencies struct and provides consistent injection. Services should use HTTP or NATS RPC for inter-service communication.
type ErrorResponse ¶
type ErrorResponse struct {
Type string `json:"type"` // Always "error"
Code string `json:"code"` // Error code: "invalid_json", "unknown_command", "missing_command"
Message string `json:"message"` // Human-readable error message
}
ErrorResponse is sent to client when a command fails (Server → Client)
type FlowConnection ¶
type FlowConnection struct {
Publisher ComponentPortReference `json:"publisher"`
Subscriber ComponentPortReference `json:"subscriber"`
Subject string `json:"subject"`
}
FlowConnection represents a connection between publisher and subscriber
type FlowGap ¶
type FlowGap struct {
ComponentName string `json:"component_name"`
PortName string `json:"port_name"`
Subject string `json:"subject"`
Direction string `json:"direction"` // "input" or "output"
Issue string `json:"issue"` // "no_publishers" or "no_subscribers"
}
FlowGap represents a disconnected port (no matching publisher/subscriber)
type FlowService ¶
type FlowService struct {
*BaseService
// contains filtered or unexported fields
}
FlowService provides HTTP APIs for visual flow builder
func (*FlowService) OpenAPISpec ¶
func (fs *FlowService) OpenAPISpec() *OpenAPISpec
OpenAPISpec returns the OpenAPI specification for flow service endpoints
func (*FlowService) RegisterHTTPHandlers ¶
func (fs *FlowService) RegisterHTTPHandlers(prefix string, mux *http.ServeMux)
RegisterHTTPHandlers registers HTTP endpoints for the flow service
type FlowServiceConfig ¶
type FlowServiceConfig struct {
// PrometheusURL is the base URL for Prometheus HTTP API
// Default: http://localhost:9090
PrometheusURL string `json:"prometheus_url,omitempty"`
// FallbackToRaw enables falling back to raw metrics when Prometheus unavailable
// Default: true
FallbackToRaw bool `json:"fallback_to_raw,omitempty"`
// LogStreamBufferSize is the buffer size for SSE log streaming channel
// Larger buffers reduce dropped logs during bursts but use more memory.
// Default: 100
LogStreamBufferSize int `json:"log_stream_buffer_size,omitempty"`
}
FlowServiceConfig holds configuration for the flow service
type FlowStatusPayload ¶
type FlowStatusPayload struct {
State string `json:"state"` // Current state: draft, deployed, running, stopped, failed
PrevState string `json:"prev_state"` // Previous state (if changed)
Timestamp int64 `json:"timestamp"` // State change timestamp (Unix milliseconds)
Error string `json:"error,omitempty"` // Error message if state=failed
}
FlowStatusPayload is the payload for type=flow_status messages
type HTTPHandler ¶
type HTTPHandler interface {
RegisterHTTPHandlers(prefix string, mux *http.ServeMux)
OpenAPISpec() *OpenAPISpec // Returns OpenAPI specification for this service
}
HTTPHandler is an optional interface for services that want to expose HTTP endpoints
type HealthCheckFunc ¶
type HealthCheckFunc func() error
HealthCheckFunc defines a custom health check function
type HeartbeatConfig ¶
type HeartbeatConfig struct {
// Interval between heartbeat logs (e.g., "30s", "1m")
// Default: "30s"
Interval string `json:"interval"`
}
HeartbeatConfig holds configuration for the Heartbeat service
func (HeartbeatConfig) Validate ¶
func (c HeartbeatConfig) Validate() error
Validate checks if the configuration is valid
type HeartbeatService ¶
type HeartbeatService struct {
*BaseService
// contains filtered or unexported fields
}
HeartbeatService emits periodic system heartbeat logs
type Info ¶
type Info struct {
Name string `json:"name"`
Status Status `json:"status"`
Uptime time.Duration `json:"uptime"`
StartTime time.Time `json:"start_time"`
MessagesProcessed int64 `json:"messages_processed"`
LastActivity time.Time `json:"last_activity"`
HealthChecks int64 `json:"health_checks"`
FailedHealthChecks int64 `json:"failed_health_checks"`
}
Info holds runtime information for a service
type InfoSpec ¶
type InfoSpec struct {
Title string `json:"title"`
Description string `json:"description"`
Version string `json:"version"`
}
InfoSpec contains API metadata
type KVTestHelper ¶
type KVTestHelper struct {
// contains filtered or unexported fields
}
KVTestHelper provides utilities for KV-based testing with JSON-only format
func NewKVTestHelper ¶
func NewKVTestHelper(t *testing.T, nc *natsclient.Client) *KVTestHelper
NewKVTestHelper creates an isolated test KV bucket
func (*KVTestHelper) AssertValidKVKey ¶
func (h *KVTestHelper) AssertValidKVKey(key string)
AssertValidKVKey validates key format per KV schema requirements
func (*KVTestHelper) GetServiceConfig ¶
GetServiceConfig reads current service configuration
func (*KVTestHelper) SimulateConcurrentUpdate ¶
func (h *KVTestHelper) SimulateConcurrentUpdate(service string) error
SimulateConcurrentUpdate tests optimistic locking behavior
func (*KVTestHelper) UpdateServiceConfig ¶
func (h *KVTestHelper) UpdateServiceConfig(service string, updateFn func(config map[string]any) error) error
UpdateServiceConfig updates with optimistic locking (uses KVStore CAS)
func (*KVTestHelper) WaitForConfigPropagation ¶
func (h *KVTestHelper) WaitForConfigPropagation(_ time.Duration) bool
WaitForConfigPropagation waits for config changes to propagate
func (*KVTestHelper) WriteComponentConfig ¶
func (h *KVTestHelper) WriteComponentConfig(componentType, name string, config map[string]any) uint64
WriteComponentConfig writes component configuration (for future use)
func (*KVTestHelper) WriteServiceConfig ¶
func (h *KVTestHelper) WriteServiceConfig(service string, config map[string]any) uint64
WriteServiceConfig writes a complete service configuration (JSON-only)
type KVWatchConnectedEvent ¶
type KVWatchConnectedEvent struct {
Bucket string `json:"bucket"`
Pattern string `json:"pattern"`
Message string `json:"message"`
}
KVWatchConnectedEvent represents the initial connection event
type KVWatchEvent ¶
type KVWatchEvent struct {
Bucket string `json:"bucket"`
Key string `json:"key"`
Operation string `json:"operation"` // "create", "update", "delete"
Value json.RawMessage `json:"value,omitempty"`
Revision uint64 `json:"revision"`
Timestamp time.Time `json:"timestamp"`
}
KVWatchEvent represents a KV change event sent via SSE
type LogEntryPayload ¶
type LogEntryPayload struct {
Level string `json:"level"` // DEBUG, INFO, WARN, ERROR
Source string `json:"source"` // Component or service name
Message string `json:"message"` // Log message
Fields map[string]any `json:"fields"` // Structured log fields
}
LogEntryPayload is the payload for type=log_entry messages
type LogForwarder ¶
type LogForwarder struct {
*BaseService
// contains filtered or unexported fields
}
LogForwarder is a service for log configuration management. With the new architecture, actual log forwarding to NATS is handled by NATSLogHandler in pkg/logging. This service validates configuration and provides a service endpoint.
func NewLogForwarder ¶
func NewLogForwarder(config *LogForwarderConfig, opts ...Option) (*LogForwarder, error)
NewLogForwarder creates a new LogForwarder service.
func (*LogForwarder) Config ¶
func (lf *LogForwarder) Config() LogForwarderConfig
Config returns the LogForwarder configuration. This can be used by other components to access the log configuration.
type LogForwarderConfig ¶
type LogForwarderConfig struct {
// MinLevel is the minimum log level to forward to NATS (DEBUG, INFO, WARN, ERROR).
// Logs below this level are still written to stdout but not published to NATS.
MinLevel string `json:"min_level"`
// ExcludeSources is a list of source prefixes to exclude from NATS forwarding.
// Logs from excluded sources still go to stdout but are not published to NATS.
// Uses prefix matching with dotted notation: excluding "flow-service.websocket"
// also excludes "flow-service.websocket.health" but NOT "flow-service".
ExcludeSources []string `json:"exclude_sources"`
}
LogForwarderConfig holds configuration for the LogForwarder service. Note: The service is enabled/disabled via types.ServiceConfig.Enabled at the outer level.
Configuration is used by: - NATSLogHandler (in pkg/logging) for min_level and exclude_sources filtering - This service for configuration validation
func (LogForwarderConfig) Validate ¶
func (c LogForwarderConfig) Validate() error
Validate checks if the configuration is valid.
type Manager ¶
type Manager struct {
*BaseService // Embed BaseService to implement Service interface
// contains filtered or unexported fields
}
Manager manages service lifecycle using a provided registry. Services are explicitly registered and created from raw JSON configs.
func NewServiceManager ¶
NewServiceManager creates a new service manager
func (*Manager) ConfigureFromServices ¶
func (m *Manager) ConfigureFromServices(services map[string]types.ServiceConfig, deps *Dependencies) error
ConfigureFromServices configures Manager directly from services config This replaces the old pattern where Manager was a service itself
func (*Manager) CreateService ¶
func (m *Manager) CreateService(name string, rawConfig json.RawMessage, deps *Dependencies) (Service, error)
CreateService creates a service instance using the registered constructor
func (*Manager) GetAllServiceStatus ¶
GetAllServiceStatus returns the status of all services
func (*Manager) GetAllServices ¶
GetAllServices returns all registered service instances
func (*Manager) GetHealthyServices ¶
GetHealthyServices returns a list of healthy services
func (*Manager) GetService ¶
GetService returns a service instance by name
func (*Manager) GetServiceRuntimeConfig ¶
GetServiceRuntimeConfig returns current runtime configuration for a service
func (*Manager) GetServiceStatus ¶
GetServiceStatus returns the status of a specific service
func (*Manager) GetUnhealthyServices ¶
GetUnhealthyServices returns a list of unhealthy services
func (*Manager) HasConstructor ¶
HasConstructor checks if a constructor is registered
func (*Manager) ListConstructors ¶
ListConstructors returns all registered constructor names
func (*Manager) RemoveService ¶
RemoveService removes a service instance
func (*Manager) StartService ¶
func (m *Manager) StartService(ctx context.Context, name string, rawConfig json.RawMessage, deps *Dependencies) error
StartService creates and starts a single service if not already running
type ManagerConfig ¶
type ManagerConfig struct {
HTTPPort int `json:"http_port"`
SwaggerUI bool `json:"swagger_ui"`
ServerInfo InfoSpec `json:"server_info"`
}
ManagerConfig holds configuration for the Manager HTTP server Simple struct - no UnmarshalJSON, no Enabled field
func (ManagerConfig) Validate ¶
func (c ManagerConfig) Validate() error
Validate checks if the configuration is valid
type MessageLogEntry ¶
type MessageLogEntry struct {
Sequence uint64 `json:"sequence"` // Monotonic sequence for index validity
Timestamp time.Time `json:"timestamp"`
Subject string `json:"subject"`
MessageType string `json:"message_type,omitempty"`
MessageID string `json:"message_id,omitempty"`
TraceID string `json:"trace_id,omitempty"` // W3C trace ID (32 hex chars)
SpanID string `json:"span_id,omitempty"` // W3C span ID (16 hex chars)
Summary string `json:"summary"`
RawData json.RawMessage `json:"raw_data,omitempty"`
Metadata map[string]any `json:"metadata,omitempty"`
}
MessageLogEntry represents a logged message
type MessageLogger ¶
type MessageLogger struct {
*BaseService
// contains filtered or unexported fields
}
MessageLogger provides message observation and logging as a service
func NewMessageLogger ¶
func NewMessageLogger( loggerConfig *MessageLoggerConfig, natsClient *natsclient.Client, opts ...Option, ) (*MessageLogger, error)
NewMessageLogger creates a new MessageLogger service
func (*MessageLogger) ApplyConfigUpdate ¶
func (ml *MessageLogger) ApplyConfigUpdate(changes map[string]any) error
ApplyConfigUpdate applies validated configuration changes. This implements the RuntimeConfigurable interface.
func (*MessageLogger) ConfigSchema ¶
func (ml *MessageLogger) ConfigSchema() ConfigSchema
ConfigSchema returns the configuration schema for this service. This implements the Configurable interface for UI discovery.
func (*MessageLogger) GetEntriesByTrace ¶
func (ml *MessageLogger) GetEntriesByTrace(traceID string) []MessageLogEntry
GetEntriesByTrace returns all log entries for a specific trace ID Entries are returned in chronological order (by sequence number)
func (*MessageLogger) GetLogEntries ¶
func (ml *MessageLogger) GetLogEntries(limit int) []MessageLogEntry
GetLogEntries returns recent log entries with optional limit
func (*MessageLogger) GetMessages ¶
func (ml *MessageLogger) GetMessages() []MessageLogEntry
GetMessages returns recent log entries
func (*MessageLogger) GetRuntimeConfig ¶
func (ml *MessageLogger) GetRuntimeConfig() map[string]any
GetRuntimeConfig returns current configuration values. This implements the RuntimeConfigurable interface.
func (*MessageLogger) GetStatistics ¶
func (ml *MessageLogger) GetStatistics() map[string]any
GetStatistics returns runtime statistics
func (*MessageLogger) OpenAPISpec ¶
func (ml *MessageLogger) OpenAPISpec() *OpenAPISpec
OpenAPISpec returns the OpenAPI specification for MessageLogger endpoints
func (*MessageLogger) RegisterHTTPHandlers ¶
func (ml *MessageLogger) RegisterHTTPHandlers(prefix string, mux *http.ServeMux)
RegisterHTTPHandlers registers HTTP endpoints for the MessageLogger service
func (*MessageLogger) Start ¶
func (ml *MessageLogger) Start(ctx context.Context) error
Start begins message observation
func (*MessageLogger) Stop ¶
func (ml *MessageLogger) Stop(timeout time.Duration) error
Stop gracefully stops the MessageLogger
func (*MessageLogger) ValidateConfigUpdate ¶
func (ml *MessageLogger) ValidateConfigUpdate(changes map[string]any) error
ValidateConfigUpdate checks if the proposed changes are valid. This implements the RuntimeConfigurable interface.
type MessageLoggerConfig ¶
type MessageLoggerConfig struct {
// Subjects to monitor
// Use "*" to auto-discover subjects from flow component configs
// Example: ["*"] or ["*", "debug.>"] or ["raw.udp.messages", "processed.>"]
MonitorSubjects []string `json:"monitor_subjects"`
// Maximum entries to keep in memory for querying
MaxEntries int `json:"max_entries"`
// Whether to output to stdout
OutputToStdout bool `json:"output_to_stdout"`
// Log level threshold (DEBUG, INFO, WARN, ERROR)
LogLevel string `json:"log_level"`
// SampleRate controls message sampling (1 in N messages logged)
// 0 or 1 = log all messages, 10 = log 10% of messages
SampleRate int `json:"sample_rate"`
}
MessageLoggerConfig holds configuration for the MessageLogger service Simple struct - no UnmarshalJSON, no Enabled field
func DefaultMessageLoggerConfig ¶
func DefaultMessageLoggerConfig() MessageLoggerConfig
DefaultMessageLoggerConfig returns sensible defaults
func (MessageLoggerConfig) Validate ¶
func (c MessageLoggerConfig) Validate() error
Validate checks if the configuration is valid
type MetricEntry ¶
type MetricEntry struct {
Name string `json:"name"` // Metric name
Type string `json:"type"` // counter, gauge, histogram
Value float64 `json:"value"` // Current value
Labels map[string]string `json:"labels"` // Metric labels
}
MetricEntry represents a single metric in a MetricsPayload
type Metrics ¶
type Metrics struct {
*BaseService
// contains filtered or unexported fields
}
Metrics is a service that provides Prometheus metrics endpoint
func (*Metrics) ApplyConfigUpdate ¶
ApplyConfigUpdate applies validated runtime configuration changes
func (*Metrics) ConfigSchema ¶
func (m *Metrics) ConfigSchema() ConfigSchema
ConfigSchema returns the configuration schema for the metrics service. This implements the Configurable interface for UI discovery.
func (*Metrics) GetRuntimeConfig ¶
GetRuntimeConfig returns current runtime configuration
type MetricsConfig ¶
MetricsConfig holds configuration for the metrics service Simple struct - no UnmarshalJSON, no Enabled field
func (MetricsConfig) Validate ¶
func (c MetricsConfig) Validate() error
Validate checks if the configuration is valid
type MetricsForwarder ¶
type MetricsForwarder struct {
*BaseService
// contains filtered or unexported fields
}
MetricsForwarder implements periodic metrics publishing to NATS
type MetricsForwarderConfig ¶
type MetricsForwarderConfig struct {
// Push interval for metrics publishing (e.g., "5s", "1m")
PushInterval string `json:"push_interval"`
// IncludeGoMetrics enables forwarding of go_* runtime metrics (goroutines, memory, GC)
// Default: false (excluded to reduce noise)
IncludeGoMetrics bool `json:"include_go_metrics"`
// IncludeProcMetrics enables forwarding of process_* metrics (CPU, open FDs, memory)
// Default: false (excluded to reduce noise)
IncludeProcMetrics bool `json:"include_proc_metrics"`
}
MetricsForwarderConfig holds configuration for the MetricsForwarder service Note: The service is enabled/disabled via types.ServiceConfig.Enabled at the outer level. If the service is created, it will forward metrics.
func (MetricsForwarderConfig) Validate ¶
func (c MetricsForwarderConfig) Validate() error
Validate checks if the configuration is valid
type MetricsGatherer ¶
type MetricsGatherer interface {
Gather() ([]*dto.MetricFamily, error)
}
MetricsGatherer defines the interface for gathering metrics. This allows for easier testing with mocks.
type MetricsPayload ¶
type MetricsPayload struct {
Component string `json:"component"` // Component name
Metrics []MetricEntry `json:"metrics"` // Array of metric values
}
MetricsPayload is the payload for type=component_metrics messages
type OpenAPIDocument ¶
type OpenAPIDocument struct {
OpenAPI string `json:"openapi"`
Info InfoSpec `json:"info"`
Servers []ServerSpec `json:"servers"`
Paths map[string]PathSpec `json:"paths"`
Components *ComponentsSpec `json:"components,omitempty"`
Tags []TagSpec `json:"tags,omitempty"`
}
OpenAPIDocument represents the complete OpenAPI 3.0 specification
type OpenAPISpec ¶
type OpenAPISpec struct {
Paths map[string]PathSpec `json:"paths"`
Components map[string]any `json:"components,omitempty"`
Tags []TagSpec `json:"tags,omitempty"`
ResponseTypes []reflect.Type `json:"-"` // Response types to generate schemas for (not serialized)
RequestBodyTypes []reflect.Type `json:"-"` // Request body types to generate schemas for (not serialized)
}
OpenAPISpec represents a service's OpenAPI specification fragment
func GetOpenAPISpec ¶
func GetOpenAPISpec(name string) (*OpenAPISpec, bool)
GetOpenAPISpec returns the OpenAPI specification for a specific service.
func NewOpenAPISpec ¶
func NewOpenAPISpec() *OpenAPISpec
NewOpenAPISpec creates a new OpenAPI specification fragment for a service
func (*OpenAPISpec) AddPath ¶
func (spec *OpenAPISpec) AddPath(path string, pathSpec PathSpec)
AddPath adds a path specification to the OpenAPI spec
func (*OpenAPISpec) AddTag ¶
func (spec *OpenAPISpec) AddTag(name, description string)
AddTag adds a tag to the OpenAPI spec
func (*OpenAPISpec) MarshalJSON ¶
func (spec *OpenAPISpec) MarshalJSON() ([]byte, error)
MarshalJSON implements json.Marshaler for OpenAPISpec
func (*OpenAPISpec) UnmarshalJSON ¶
func (spec *OpenAPISpec) UnmarshalJSON(data []byte) error
UnmarshalJSON implements json.Unmarshaler for OpenAPISpec
type OperationSpec ¶
type OperationSpec struct {
Summary string `json:"summary"`
Description string `json:"description,omitempty"`
Parameters []ParameterSpec `json:"parameters,omitempty"`
RequestBody *RequestBodySpec `json:"request_body,omitempty"`
Responses map[string]ResponseSpec `json:"responses"`
Tags []string `json:"tags,omitempty"`
}
OperationSpec defines a single HTTP operation
type Option ¶
type Option func(*BaseService)
Option is a functional option for configuring BaseService
func OnHealthChange ¶
OnHealthChange sets a callback for health state changes
func WithHealthCheck ¶
func WithHealthCheck(fn HealthCheckFunc) Option
WithHealthCheck sets a custom health check function
func WithHealthInterval ¶
WithHealthInterval sets the health check interval
func WithLogger ¶
WithLogger sets a custom logger for the service
func WithMetrics ¶
func WithMetrics(registry *metric.MetricsRegistry) Option
WithMetrics sets the metrics registry for the service
func WithNATS ¶
func WithNATS(client *natsclient.Client) Option
WithNATS sets the NATS client for the service
type OverallHealth ¶
type OverallHealth struct {
Status string `json:"status"` // "healthy", "degraded", "error"
RunningCount int `json:"running_count"`
DegradedCount int `json:"degraded_count"`
ErrorCount int `json:"error_count"`
}
OverallHealth provides aggregate health status and counts
type ParameterSpec ¶
type ParameterSpec struct {
Name string `json:"name"`
In string `json:"in"` // "query", "path", "header"
Description string `json:"description,omitempty"`
Required bool `json:"required,omitempty"`
Schema Schema `json:"schema,omitempty"`
}
ParameterSpec defines an operation parameter
type PathSpec ¶
type PathSpec struct {
GET *OperationSpec `json:"get,omitempty"`
POST *OperationSpec `json:"post,omitempty"`
PUT *OperationSpec `json:"put,omitempty"`
PATCH *OperationSpec `json:"patch,omitempty"`
DELETE *OperationSpec `json:"delete,omitempty"`
}
PathSpec defines HTTP operations for a specific path
type PropertySchema ¶
type PropertySchema struct {
component.PropertySchema
// Runtime indicates if this property can be changed without restart
Runtime bool `json:"runtime,omitempty"`
// Category groups related properties for UI organization
Category string `json:"category,omitempty"`
}
PropertySchema extends component.PropertySchema with service-specific fields
type Registry ¶
type Registry struct {
// contains filtered or unexported fields
}
Registry manages service constructor registration
func NewServiceRegistry ¶
func NewServiceRegistry() *Registry
NewServiceRegistry creates a new service registry
func (*Registry) Constructor ¶
func (r *Registry) Constructor(name string) (Constructor, bool)
Constructor returns a constructor for the given service name
func (*Registry) Constructors ¶
func (r *Registry) Constructors() map[string]Constructor
Constructors returns a copy of all constructors
type RequestBodySpec ¶
type RequestBodySpec struct {
Description string `json:"description,omitempty"`
ContentType string `json:"content_type,omitempty"` // defaults to "application/json"
SchemaRef string `json:"schema_ref,omitempty"` // e.g. "#/components/schemas/ReviewRequest"
Required bool `json:"required,omitempty"`
}
RequestBodySpec defines an operation request body
type ResponseSpec ¶
type ResponseSpec struct {
Description string `json:"description"`
ContentType string `json:"content_type,omitempty"`
SchemaRef string `json:"schema_ref,omitempty"` // $ref to schema, e.g., "#/components/schemas/RuntimeHealthResponse"
IsArray bool `json:"is_array,omitempty"` // If true, response is an array of SchemaRef items
}
ResponseSpec defines an operation response
type RuntimeConfigurable ¶
type RuntimeConfigurable interface {
Configurable
// ValidateConfigUpdate checks if the proposed changes are valid
ValidateConfigUpdate(changes map[string]any) error
// ApplyConfigUpdate applies validated configuration changes
ApplyConfigUpdate(changes map[string]any) error
// GetRuntimeConfig returns current runtime configuration values
GetRuntimeConfig() map[string]any
}
RuntimeConfigurable is an optional interface for services that support runtime configuration updates without restart.
type RuntimeHealthResponse ¶
type RuntimeHealthResponse struct {
Timestamp time.Time `json:"timestamp"`
Overall OverallHealth `json:"overall"`
Components []ComponentHealth `json:"components"`
}
RuntimeHealthResponse represents the JSON response for runtime health
type RuntimeMessage ¶
type RuntimeMessage struct {
Timestamp string `json:"timestamp"`
Subject string `json:"subject"`
MessageID string `json:"message_id"`
Component string `json:"component"`
Direction string `json:"direction"`
Summary string `json:"summary"`
Metadata map[string]any `json:"metadata,omitempty"`
MessageType string `json:"message_type,omitempty"`
}
RuntimeMessage represents a formatted message entry for UI consumption
type RuntimeMessagesResponse ¶
type RuntimeMessagesResponse struct {
Timestamp string `json:"timestamp"`
Messages []RuntimeMessage `json:"messages"`
Total int `json:"total"`
Limit int `json:"limit"`
Note string `json:"note,omitempty"`
}
RuntimeMessagesResponse represents the response structure for runtime messages
type RuntimeMetricsResponse ¶
type RuntimeMetricsResponse struct {
Timestamp time.Time `json:"timestamp"`
PrometheusAvailable bool `json:"prometheus_available"`
Components []ComponentMetric `json:"components"`
}
RuntimeMetricsResponse represents the JSON response for runtime metrics
type ServerSpec ¶
ServerSpec defines an API server
type Service ¶
type Service interface {
Name() string
Start(ctx context.Context) error
Stop(timeout time.Duration) error
Status() Status
IsHealthy() bool // Keep for compatibility during migration
GetStatus() Info // Keep for compatibility during migration
Health() health.Status // NEW: Standard health reporting
RegisterMetrics(registrar metric.MetricsRegistrar) error
}
Service interface defines the contract for all services
func NewComponentManager ¶
func NewComponentManager(rawConfig json.RawMessage, deps *Dependencies) (Service, error)
NewComponentManager creates a new ComponentManager using the standard constructor pattern
func NewFlowServiceFromConfig ¶
func NewFlowServiceFromConfig(rawConfig json.RawMessage, deps *Dependencies) (Service, error)
NewFlowServiceFromConfig creates a new flow service
func NewHeartbeatService ¶
func NewHeartbeatService(rawConfig json.RawMessage, deps *Dependencies) (Service, error)
NewHeartbeatService creates a new heartbeat service using the standard constructor pattern
func NewLogForwarderService ¶
func NewLogForwarderService(rawConfig json.RawMessage, deps *Dependencies) (Service, error)
NewLogForwarderService creates a new log forwarder service using the standard constructor pattern. With the new architecture, LogForwarder no longer intercepts slog - logs are published to NATS directly by the NATSLogHandler in pkg/logging. This service exists for configuration management and potential future features (e.g., log aggregation, filtering at the service level).
func NewMessageLoggerService ¶
func NewMessageLoggerService(rawConfig json.RawMessage, deps *Dependencies) (Service, error)
NewMessageLoggerService creates a new message logger service using the standard constructor pattern
func NewMetrics ¶
func NewMetrics(rawConfig json.RawMessage, deps *Dependencies) (Service, error)
NewMetrics creates a new metrics service using the standard constructor pattern
func NewMetricsForwarderService ¶
func NewMetricsForwarderService(rawConfig json.RawMessage, deps *Dependencies) (Service, error)
NewMetricsForwarderService creates a new metrics forwarder service using the standard constructor pattern
type Status ¶
type Status int
Status represents the current status of a service
Possible service statuses
type StatusStreamEnvelope ¶
type StatusStreamEnvelope struct {
Type string `json:"type"`
ID string `json:"id"`
Timestamp int64 `json:"timestamp"`
FlowID string `json:"flow_id"`
Payload json.RawMessage `json:"payload,omitempty"`
}
StatusStreamEnvelope wraps all WebSocket status stream messages
type SubscribeAck ¶
type SubscribeAck struct {
Type string `json:"type"` // Always "subscribe_ack"
Subscribed []string `json:"subscribed"` // Message types now subscribed
LogLevel string `json:"log_level,omitempty"` // Current log level filter (empty = all)
Sources []string `json:"sources,omitempty"` // Current source filters (empty = all)
}
SubscribeAck is the acknowledgment response sent after processing a subscribe command (Server → Client)
type SubscribeCommand ¶
type SubscribeCommand struct {
Command string `json:"command"` // Must be "subscribe"
MessageTypes []string `json:"message_types,omitempty"` // Filter: flow_status, component_health, component_metrics, log_entry
LogLevel string `json:"log_level,omitempty"` // Minimum log level: DEBUG, INFO, WARN, ERROR
Sources []string `json:"sources,omitempty"` // Filter by source component names
}
SubscribeCommand represents a client subscription command (Client → Server)
Source Files
¶
- base.go
- component_manager.go
- component_manager_config.go
- component_manager_http.go
- configurable.go
- dependencies.go
- doc.go
- flow_runtime_health.go
- flow_runtime_logs.go
- flow_runtime_messages.go
- flow_runtime_metrics.go
- flow_runtime_stream.go
- flow_service.go
- heartbeat.go
- kv_test_helpers.go
- log_forwarder.go
- message_logger.go
- message_logger_http.go
- message_logger_kv_watch.go
- metrics.go
- metrics_forwarder.go
- openapi.go
- openapi_registry.go
- openapi_types.go
- register.go
- registry.go
- schema.go
- service_manager.go
- service_manager_config.go