supervisor

package
v0.15.2 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jan 19, 2026 License: MIT Imports: 13 Imported by: 0

Documentation

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

This section is empty.

Types

type ActorPool

type ActorPool struct {
	// contains filtered or unexported fields
}

ActorPool manages the lifecycle of server actors and provides stats for Supervisor. This replaces UpstreamAdapter with direct Actor integration (Phase 7.2).

func NewActorPool

func NewActorPool(manager *upstream.Manager, logger *zap.Logger) *ActorPool

NewActorPool creates a new actor pool.

func (*ActorPool) AddServer

func (p *ActorPool) AddServer(name string, cfg *config.ServerConfig) error

AddServer creates and starts an actor for the given server. If the actor already exists, it updates the configuration.

func (*ActorPool) Close

func (p *ActorPool) Close()

Close cleans up the actor pool.

func (*ActorPool) ConnectAll

func (p *ActorPool) ConnectAll(ctx context.Context) error

ConnectAll sends connect commands to all actors.

func (*ActorPool) ConnectServer

func (p *ActorPool) ConnectServer(ctx context.Context, name string) error

ConnectServer sends a connect command to an actor.

func (*ActorPool) DisconnectServer

func (p *ActorPool) DisconnectServer(name string) error

DisconnectServer sends a disconnect command to an actor.

func (*ActorPool) GetAllStates

func (p *ActorPool) GetAllStates() map[string]*ServerState

GetAllStates returns the current state of all servers.

func (*ActorPool) GetServerState

func (p *ActorPool) GetServerState(name string) (*ServerState, error)

GetServerState returns the current state of a server from its actor.

func (*ActorPool) IsUserLoggedOut

func (p *ActorPool) IsUserLoggedOut(name string) bool

IsUserLoggedOut returns true if the user explicitly logged out from the server. This prevents automatic reconnection after explicit logout.

func (*ActorPool) RemoveServer

func (p *ActorPool) RemoveServer(name string) error

RemoveServer stops and removes an actor.

func (*ActorPool) Subscribe

func (p *ActorPool) Subscribe() <-chan Event

Subscribe returns a channel that receives supervisor events.

func (*ActorPool) Unsubscribe

func (p *ActorPool) Unsubscribe(ch <-chan Event)

Unsubscribe removes a subscriber channel.

type ActorPoolSimple

type ActorPoolSimple struct {
	// contains filtered or unexported fields
}

ActorPoolSimple is a simplified facade over UpstreamManager that delegates all operations. Phase 7.3: Avoids double lifecycle management by using UpstreamManager's existing client management.

func NewActorPoolSimple

func NewActorPoolSimple(manager *upstream.Manager, logger *zap.Logger) *ActorPoolSimple

NewActorPoolSimple creates a simplified actor pool that delegates to UpstreamManager.

func (*ActorPoolSimple) AddServer

func (p *ActorPoolSimple) AddServer(name string, cfg *config.ServerConfig) error

AddServer adds a server configuration to the manager.

func (*ActorPoolSimple) Close

func (p *ActorPoolSimple) Close()

Close cleans up the pool.

func (*ActorPoolSimple) ConnectAll

func (p *ActorPoolSimple) ConnectAll(ctx context.Context) error

ConnectAll tells the manager to connect all servers.

func (*ActorPoolSimple) ConnectServer

func (p *ActorPoolSimple) ConnectServer(ctx context.Context, name string) error

ConnectServer tells the manager to connect a server.

func (*ActorPoolSimple) DisconnectServer

func (p *ActorPoolSimple) DisconnectServer(name string) error

DisconnectServer tells the manager to disconnect a server.

func (*ActorPoolSimple) GetAllStates

func (p *ActorPoolSimple) GetAllStates() map[string]*ServerState

GetAllStates returns the current state of all servers from the manager.

func (*ActorPoolSimple) GetServerState

func (p *ActorPoolSimple) GetServerState(name string) (*ServerState, error)

GetServerState returns the current state of a server from the manager. Phase 7.1 FIX: Fetches tools for the specific server to avoid blocking all servers.

func (*ActorPoolSimple) IsUserLoggedOut

func (p *ActorPoolSimple) IsUserLoggedOut(name string) bool

IsUserLoggedOut returns true if the user explicitly logged out from the server. This prevents automatic reconnection after explicit logout.

func (*ActorPoolSimple) RemoveServer

func (p *ActorPoolSimple) RemoveServer(name string) error

RemoveServer removes a server from the manager.

func (*ActorPoolSimple) SendNotification

func (p *ActorPoolSimple) SendNotification(notification *upstream.Notification)

SendNotification implements upstream.NotificationHandler interface

func (*ActorPoolSimple) Subscribe

func (p *ActorPoolSimple) Subscribe() <-chan Event

Subscribe returns a channel that receives supervisor events.

func (*ActorPoolSimple) Unsubscribe

func (p *ActorPoolSimple) Unsubscribe(ch <-chan Event)

Unsubscribe removes a subscriber channel.

type Event

type Event struct {
	Type       EventType
	ServerName string
	Timestamp  time.Time
	Payload    map[string]interface{}
}

Event represents a supervisor lifecycle event.

type EventType

type EventType string

EventType represents supervisor lifecycle events.

const (
	// EventServerAdded is emitted when a new server is added to desired state
	EventServerAdded EventType = "server.added"

	// EventServerRemoved is emitted when a server is removed from desired state
	EventServerRemoved EventType = "server.removed"

	// EventServerUpdated is emitted when a server's desired config changes
	EventServerUpdated EventType = "server.updated"

	// EventServerConnected is emitted when a server successfully connects
	EventServerConnected EventType = "server.connected"

	// EventServerDisconnected is emitted when a server disconnects
	EventServerDisconnected EventType = "server.disconnected"

	// EventServerStateChanged is emitted when a server's connection state changes
	EventServerStateChanged EventType = "server.state_changed"

	// EventReconciliationComplete is emitted after a reconciliation cycle completes
	EventReconciliationComplete EventType = "reconciliation.complete"

	// EventReconciliationFailed is emitted when reconciliation fails
	EventReconciliationFailed EventType = "reconciliation.failed"
)

type ReconcileAction

type ReconcileAction string

ReconcileAction describes what action the supervisor should take.

const (
	// ActionNone means no action is needed
	ActionNone ReconcileAction = "none"

	// ActionConnect means the server should be connected
	ActionConnect ReconcileAction = "connect"

	// ActionDisconnect means the server should be disconnected
	ActionDisconnect ReconcileAction = "disconnect"

	// ActionReconnect means the server should be disconnected and reconnected
	ActionReconnect ReconcileAction = "reconnect"

	// ActionRemove means the server should be removed
	ActionRemove ReconcileAction = "remove"
)

type ReconcilePlan

type ReconcilePlan struct {
	Actions   map[string]ReconcileAction // server name -> action
	Timestamp time.Time
	Reason    string
}

ReconcilePlan describes the actions to take during reconciliation.

type ServerState

type ServerState struct {
	// Desired state from configuration
	Name        string
	Config      *config.ServerConfig
	Enabled     bool
	Quarantined bool

	// Actual state from upstream manager
	Connected      bool
	ConnectionInfo *types.ConnectionInfo
	LastSeen       time.Time
	ToolCount      int
	Tools          []*config.ToolMetadata // Phase 7.1: Cached tools for lock-free reads

	// Reconciliation metadata
	DesiredVersion int64 // Config version that defines this desired state
	LastReconcile  time.Time
	ReconcileCount int
}

ServerState represents the desired and actual state of an upstream server.

type ServerStateSnapshot

type ServerStateSnapshot struct {
	Servers   map[string]*ServerState
	Timestamp time.Time
	Version   int64 // Monotonically increasing
}

ServerStateSnapshot is an immutable view of all server states.

func (*ServerStateSnapshot) Clone

Clone creates a deep copy of the snapshot.

type Supervisor

type Supervisor struct {
	// contains filtered or unexported fields
}

Supervisor manages the desired vs actual state reconciliation for upstream servers. It subscribes to config changes and emits events when server states change.

func New

func New(configSvc *configsvc.Service, upstream UpstreamInterface, logger *zap.Logger) *Supervisor

New creates a new supervisor.

func (*Supervisor) CanInspect

func (s *Supervisor) CanInspect(serverName string) (bool, string, time.Duration)

CanInspect checks if inspection is allowed for a server (circuit breaker) Returns (allowed bool, reason string, cooldownRemaining time.Duration)

func (*Supervisor) CurrentSnapshot

func (s *Supervisor) CurrentSnapshot() *ServerStateSnapshot

CurrentSnapshot returns the current state snapshot (lock-free read).

func (*Supervisor) GetInspectionStats

func (s *Supervisor) GetInspectionStats(serverName string) (failures int, inCooldown bool, cooldownRemaining time.Duration)

GetInspectionStats returns inspection failure statistics for a server

func (*Supervisor) IsInspectionExempted

func (s *Supervisor) IsInspectionExempted(serverName string) bool

IsInspectionExempted checks if a server has an active inspection exemption. Automatically cleans up expired exemptions.

func (*Supervisor) RecordInspectionFailure

func (s *Supervisor) RecordInspectionFailure(serverName string)

RecordInspectionFailure records an inspection failure for circuit breaker

func (*Supervisor) RecordInspectionSuccess

func (s *Supervisor) RecordInspectionSuccess(serverName string)

RecordInspectionSuccess records a successful inspection, resetting failure counter

func (*Supervisor) RefreshToolsFromDiscovery

func (s *Supervisor) RefreshToolsFromDiscovery(tools []*config.ToolMetadata) error

RefreshToolsFromDiscovery updates both the Supervisor snapshot and StateView with tools from background discovery. This is called after DiscoverAndIndexTools completes to populate the UI cache.

func (*Supervisor) RequestInspectionExemption

func (s *Supervisor) RequestInspectionExemption(serverName string, duration time.Duration) error

RequestInspectionExemption grants temporary connection permission for a quarantined server. This allows security inspection to temporarily connect to quarantined servers. Triggers immediate reconciliation to connect the server.

func (*Supervisor) RevokeInspectionExemption

func (s *Supervisor) RevokeInspectionExemption(serverName string)

RevokeInspectionExemption revokes the temporary connection permission and triggers disconnection.

func (*Supervisor) SetOnServerConnectedCallback

func (s *Supervisor) SetOnServerConnectedCallback(callback func(serverName string))

SetOnServerConnectedCallback sets a callback to be invoked when a server connects. This allows for reactive tool discovery instead of relying on periodic polling.

func (*Supervisor) Start

func (s *Supervisor) Start()

Start begins the supervisor's reconciliation loop.

func (*Supervisor) StateView

func (s *Supervisor) StateView() *stateview.View

StateView returns the read-only state view (Phase 4). This provides a lock-free view of server statuses for API consumers.

func (*Supervisor) Stop

func (s *Supervisor) Stop()

Stop gracefully stops the supervisor.

func (*Supervisor) Subscribe

func (s *Supervisor) Subscribe() <-chan Event

Subscribe returns a channel that receives supervisor events.

func (*Supervisor) Unsubscribe

func (s *Supervisor) Unsubscribe(ch <-chan Event)

Unsubscribe removes a subscriber.

type UpstreamInterface

type UpstreamInterface interface {
	AddServer(name string, cfg *config.ServerConfig) error
	RemoveServer(name string) error
	ConnectServer(ctx context.Context, name string) error
	DisconnectServer(name string) error
	ConnectAll(ctx context.Context) error
	GetServerState(name string) (*ServerState, error)
	GetAllStates() map[string]*ServerState
	IsUserLoggedOut(name string) bool // Returns true if user explicitly logged out (prevents auto-reconnect)
	Subscribe() <-chan Event
	Unsubscribe(ch <-chan Event)
	Close()
}

UpstreamInterface defines the interface for upstream adapters.

Directories

Path Synopsis

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL