Documentation
¶
Overview ¶
Package gateway provides a central FlowWatch gateway that aggregates multiple subsystem FlowWatch instances behind a single EngineAdapter. Subsystems register via a persistent bidirectional gRPC stream, and the gateway proxies UI requests to the appropriate subsystem.
Index ¶
- func ConnectToGateway(ctx context.Context, opts ...ClientOption) error
- type Adapter
- func (a *Adapter) CancelRun(ctx context.Context, runID string, reason string) error
- func (a *Adapter) Capabilities() *adapter.EngineCapabilities
- func (a *Adapter) DeregisterSubsystem(subsystemID string)
- func (a *Adapter) EngineName() string
- func (a *Adapter) GetFailureRate(ctx context.Context, p adapter.AnalyticsParams) ([]adapter.FailureRatePoint, error)
- func (a *Adapter) GetGraph(ctx context.Context, workflowID string) (*adapter.GraphData, error)
- func (a *Adapter) GetLatency(ctx context.Context, p adapter.AnalyticsParams) ([]adapter.LatencyPoint, error)
- func (a *Adapter) GetRun(ctx context.Context, runID string) (*adapter.RunData, error)
- func (a *Adapter) GetRunLogs(ctx context.Context, runID string, stepName string, cursor string, limit int) ([]adapter.LogData, string, error)
- func (a *Adapter) GetRunSteps(ctx context.Context, runID string) ([]adapter.StepData, error)
- func (a *Adapter) GetRunTimeline(ctx context.Context, runID string) (*adapter.TimelineData, error)
- func (a *Adapter) GetStepDuration(ctx context.Context, p adapter.StepDurationParams) (*adapter.StepDurationReport, error)
- func (a *Adapter) GetStepHeatmap(ctx context.Context, p adapter.AnalyticsParams) ([]adapter.HeatmapPoint, error)
- func (a *Adapter) GetSubsystem(ctx context.Context, subsystemID string) (*adapter.SubsystemDetail, error)
- func (a *Adapter) GetThroughput(ctx context.Context, p adapter.AnalyticsParams) ([]adapter.ThroughputPoint, error)
- func (a *Adapter) GetTrace(ctx context.Context, traceID string) (*adapter.TraceData, error)
- func (a *Adapter) GetWorkflow(ctx context.Context, workflowID string) (*adapter.WorkflowDef, error)
- func (a *Adapter) ListRuns(ctx context.Context, filter adapter.RunFilter, cursor string, limit int) ([]adapter.RunData, string, error)
- func (a *Adapter) ListSubsystems(_ context.Context) ([]adapter.SubsystemSummary, error)
- func (a *Adapter) ListWorkflows(ctx context.Context) ([]adapter.WorkflowDef, error)
- func (a *Adapter) MarkStale(subsystemID string)
- func (a *Adapter) PauseRun(ctx context.Context, runID string) error
- func (a *Adapter) RegisterSubsystem(info SubsystemInfo)
- func (a *Adapter) ResumeRun(ctx context.Context, runID string) error
- func (a *Adapter) RetryFromStep(ctx context.Context, runID, stepName string) (string, error)
- func (a *Adapter) RetryRun(ctx context.Context, runID string) (string, error)
- func (a *Adapter) SkipStep(ctx context.Context, runID, stepName string, reason string) error
- func (a *Adapter) SubscribeRun(_ context.Context, _ string) (<-chan adapter.RunDetailEvent, error)
- func (a *Adapter) SubscribeRuns(_ context.Context, _ adapter.RunFilter) (<-chan adapter.RunEvent, error)
- type AdapterOption
- type ClientOption
- func WithClientAPIKey(key string) ClientOption
- func WithClientHTTPClient(c *http.Client) ClientOption
- func WithDescription(desc string) ClientOption
- func WithGatewayAddr(addr string) ClientOption
- func WithLocalEndpoint(endpoint string) ClientOption
- func WithSubsystemID(id string) ClientOption
- func WithSubsystemName(name string) ClientOption
- type HealthMonitor
- type Server
- type SubsystemInfo
Constants ¶
This section is empty.
Variables ¶
This section is empty.
Functions ¶
func ConnectToGateway ¶
func ConnectToGateway(ctx context.Context, opts ...ClientOption) error
ConnectToGateway opens a persistent registration stream to the gateway and handles heartbeats and automatic reconnection. The subsystem must still run its own FlowWatch HTTP server for the gateway to proxy requests to.
This function blocks until ctx is cancelled. It reconnects with exponential backoff on stream errors.
Types ¶
type Adapter ¶
type Adapter struct {
// contains filtered or unexported fields
}
Adapter implements adapter.EngineAdapter by routing requests to registered subsystem FlowWatch instances.
func NewAdapter ¶
func NewAdapter(opts ...AdapterOption) *Adapter
NewAdapter creates a new gateway adapter.
func (*Adapter) Capabilities ¶
func (a *Adapter) Capabilities() *adapter.EngineCapabilities
func (*Adapter) DeregisterSubsystem ¶
DeregisterSubsystem removes a subsystem from the registry.
func (*Adapter) EngineName ¶
func (*Adapter) GetFailureRate ¶
func (a *Adapter) GetFailureRate(ctx context.Context, p adapter.AnalyticsParams) ([]adapter.FailureRatePoint, error)
func (*Adapter) GetLatency ¶
func (a *Adapter) GetLatency(ctx context.Context, p adapter.AnalyticsParams) ([]adapter.LatencyPoint, error)
func (*Adapter) GetRunLogs ¶
func (*Adapter) GetRunSteps ¶
func (*Adapter) GetRunTimeline ¶
func (*Adapter) GetStepDuration ¶
func (a *Adapter) GetStepDuration(ctx context.Context, p adapter.StepDurationParams) (*adapter.StepDurationReport, error)
func (*Adapter) GetStepHeatmap ¶
func (a *Adapter) GetStepHeatmap(ctx context.Context, p adapter.AnalyticsParams) ([]adapter.HeatmapPoint, error)
func (*Adapter) GetSubsystem ¶
func (*Adapter) GetThroughput ¶
func (a *Adapter) GetThroughput(ctx context.Context, p adapter.AnalyticsParams) ([]adapter.ThroughputPoint, error)
func (*Adapter) GetWorkflow ¶
func (*Adapter) ListSubsystems ¶
func (*Adapter) ListWorkflows ¶
func (*Adapter) MarkStale ¶
MarkStale marks a subsystem as stale (disconnected but within grace period).
func (*Adapter) RegisterSubsystem ¶
func (a *Adapter) RegisterSubsystem(info SubsystemInfo)
RegisterSubsystem adds a subsystem to the gateway registry.
func (*Adapter) RetryFromStep ¶
func (*Adapter) SubscribeRun ¶
type AdapterOption ¶
type AdapterOption func(*adapterConfig)
AdapterOption configures the GatewayAdapter.
func WithAPIKey ¶
func WithAPIKey(key string) AdapterOption
WithAPIKey sets the shared secret that subsystems must present to register. An empty key disables authentication.
func WithGracePeriod ¶
func WithGracePeriod(d time.Duration) AdapterOption
WithGracePeriod sets how long a disconnected subsystem remains in the registry before being removed. Default: 30s.
func WithHTTPClient ¶
func WithHTTPClient(c *http.Client) AdapterOption
WithHTTPClient sets the HTTP client used to create Connect-Go clients for proxying requests to subsystems.
func WithHeartbeatInterval ¶
func WithHeartbeatInterval(d time.Duration) AdapterOption
WithHeartbeatInterval sets the interval between heartbeat pings sent to subsystems. Default: 10s.
func WithRunCacheSize ¶
func WithRunCacheSize(n int) AdapterOption
WithRunCacheSize sets the maximum number of run-to-subsystem mappings to cache. Default: 100,000.
type ClientOption ¶
type ClientOption func(*clientConfig)
ClientOption configures the subsystem-side gateway client.
func WithClientAPIKey ¶
func WithClientAPIKey(key string) ClientOption
WithClientAPIKey sets the shared secret for authentication with the gateway.
func WithClientHTTPClient ¶
func WithClientHTTPClient(c *http.Client) ClientOption
WithClientHTTPClient sets the HTTP client for the gateway connection.
func WithDescription ¶
func WithDescription(desc string) ClientOption
WithDescription sets the subsystem description.
func WithGatewayAddr ¶
func WithGatewayAddr(addr string) ClientOption
WithGatewayAddr sets the gateway server address to connect to.
func WithLocalEndpoint ¶
func WithLocalEndpoint(endpoint string) ClientOption
WithLocalEndpoint sets the FlowWatch HTTP endpoint of this subsystem that the gateway will connect back to for proxying requests.
func WithSubsystemID ¶
func WithSubsystemID(id string) ClientOption
WithSubsystemID sets the unique subsystem identifier.
func WithSubsystemName ¶
func WithSubsystemName(name string) ClientOption
WithSubsystemName sets the human-readable subsystem name.
type HealthMonitor ¶
type HealthMonitor struct {
// contains filtered or unexported fields
}
HealthMonitor periodically checks subsystem connections and removes subsystems that have been stale beyond the grace period.
func NewHealthMonitor ¶
func NewHealthMonitor(a *Adapter) *HealthMonitor
NewHealthMonitor creates a new health monitor. The check interval defaults to half the heartbeat interval for timely stale detection.
func (*HealthMonitor) Start ¶
func (h *HealthMonitor) Start(ctx context.Context)
Start begins the background health check loop.
func (*HealthMonitor) Stop ¶
func (h *HealthMonitor) Stop()
Stop waits for the health monitor to finish.
type Server ¶
type Server struct {
// contains filtered or unexported fields
}
Server implements the GatewayServiceHandler interface for subsystem registration via bidirectional streams.
func (*Server) Register ¶
func (s *Server) Register(ctx context.Context, stream *connect.BidiStream[flowwatchv1.SubsystemMessage, flowwatchv1.GatewayMessage]) error
Register handles a bidirectional registration stream from a subsystem.
type SubsystemInfo ¶
type SubsystemInfo struct {
SubsystemID string
SubsystemName string
Description string
Endpoint string
Workflows []*flowwatchv1.WorkflowDefinition
Capabilities *flowwatchv1.Capabilities
Metadata map[string]string
}
SubsystemInfo holds metadata about a registered subsystem.