gateway

package
v0.0.0-...-312cec9 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Mar 9, 2026 License: BSD-3-Clause Imports: 13 Imported by: 0

Documentation

Overview

Package gateway provides a central FlowWatch gateway that aggregates multiple subsystem FlowWatch instances behind a single EngineAdapter. Subsystems register via a persistent bidirectional gRPC stream, and the gateway proxies UI requests to the appropriate subsystem.

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

func ConnectToGateway

func ConnectToGateway(ctx context.Context, opts ...ClientOption) error

ConnectToGateway opens a persistent registration stream to the gateway and handles heartbeats and automatic reconnection. The subsystem must still run its own FlowWatch HTTP server for the gateway to proxy requests to.

This function blocks until ctx is cancelled. It reconnects with exponential backoff on stream errors.

Types

type Adapter

type Adapter struct {
	// contains filtered or unexported fields
}

Adapter implements adapter.EngineAdapter by routing requests to registered subsystem FlowWatch instances.

func NewAdapter

func NewAdapter(opts ...AdapterOption) *Adapter

NewAdapter creates a new gateway adapter.

func (*Adapter) CancelRun

func (a *Adapter) CancelRun(ctx context.Context, runID string, reason string) error

func (*Adapter) Capabilities

func (a *Adapter) Capabilities() *adapter.EngineCapabilities

func (*Adapter) DeregisterSubsystem

func (a *Adapter) DeregisterSubsystem(subsystemID string)

DeregisterSubsystem removes a subsystem from the registry.

func (*Adapter) EngineName

func (a *Adapter) EngineName() string

func (*Adapter) GetFailureRate

func (*Adapter) GetGraph

func (a *Adapter) GetGraph(ctx context.Context, workflowID string) (*adapter.GraphData, error)

func (*Adapter) GetLatency

func (*Adapter) GetRun

func (a *Adapter) GetRun(ctx context.Context, runID string) (*adapter.RunData, error)

func (*Adapter) GetRunLogs

func (a *Adapter) GetRunLogs(ctx context.Context, runID string, stepName string, cursor string, limit int) ([]adapter.LogData, string, error)

func (*Adapter) GetRunSteps

func (a *Adapter) GetRunSteps(ctx context.Context, runID string) ([]adapter.StepData, error)

func (*Adapter) GetRunTimeline

func (a *Adapter) GetRunTimeline(ctx context.Context, runID string) (*adapter.TimelineData, error)

func (*Adapter) GetStepDuration

func (*Adapter) GetStepHeatmap

func (a *Adapter) GetStepHeatmap(ctx context.Context, p adapter.AnalyticsParams) ([]adapter.HeatmapPoint, error)

func (*Adapter) GetSubsystem

func (a *Adapter) GetSubsystem(ctx context.Context, subsystemID string) (*adapter.SubsystemDetail, error)

func (*Adapter) GetThroughput

func (*Adapter) GetTrace

func (a *Adapter) GetTrace(ctx context.Context, traceID string) (*adapter.TraceData, error)

func (*Adapter) GetWorkflow

func (a *Adapter) GetWorkflow(ctx context.Context, workflowID string) (*adapter.WorkflowDef, error)

func (*Adapter) ListRuns

func (a *Adapter) ListRuns(ctx context.Context, filter adapter.RunFilter, cursor string, limit int) ([]adapter.RunData, string, error)

func (*Adapter) ListSubsystems

func (a *Adapter) ListSubsystems(_ context.Context) ([]adapter.SubsystemSummary, error)

func (*Adapter) ListWorkflows

func (a *Adapter) ListWorkflows(ctx context.Context) ([]adapter.WorkflowDef, error)

func (*Adapter) MarkStale

func (a *Adapter) MarkStale(subsystemID string)

MarkStale marks a subsystem as stale (disconnected but within grace period).

func (*Adapter) PauseRun

func (a *Adapter) PauseRun(ctx context.Context, runID string) error

func (*Adapter) RegisterSubsystem

func (a *Adapter) RegisterSubsystem(info SubsystemInfo)

RegisterSubsystem adds a subsystem to the gateway registry.

func (*Adapter) ResumeRun

func (a *Adapter) ResumeRun(ctx context.Context, runID string) error

func (*Adapter) RetryFromStep

func (a *Adapter) RetryFromStep(ctx context.Context, runID, stepName string) (string, error)

func (*Adapter) RetryRun

func (a *Adapter) RetryRun(ctx context.Context, runID string) (string, error)

func (*Adapter) SkipStep

func (a *Adapter) SkipStep(ctx context.Context, runID, stepName string, reason string) error

func (*Adapter) SubscribeRun

func (a *Adapter) SubscribeRun(_ context.Context, _ string) (<-chan adapter.RunDetailEvent, error)

func (*Adapter) SubscribeRuns

func (a *Adapter) SubscribeRuns(_ context.Context, _ adapter.RunFilter) (<-chan adapter.RunEvent, error)

type AdapterOption

type AdapterOption func(*adapterConfig)

AdapterOption configures the GatewayAdapter.

func WithAPIKey

func WithAPIKey(key string) AdapterOption

WithAPIKey sets the shared secret that subsystems must present to register. An empty key disables authentication.

func WithGracePeriod

func WithGracePeriod(d time.Duration) AdapterOption

WithGracePeriod sets how long a disconnected subsystem remains in the registry before being removed. Default: 30s.

func WithHTTPClient

func WithHTTPClient(c *http.Client) AdapterOption

WithHTTPClient sets the HTTP client used to create Connect-Go clients for proxying requests to subsystems.

func WithHeartbeatInterval

func WithHeartbeatInterval(d time.Duration) AdapterOption

WithHeartbeatInterval sets the interval between heartbeat pings sent to subsystems. Default: 10s.

func WithRunCacheSize

func WithRunCacheSize(n int) AdapterOption

WithRunCacheSize sets the maximum number of run-to-subsystem mappings to cache. Default: 100,000.

type ClientOption

type ClientOption func(*clientConfig)

ClientOption configures the subsystem-side gateway client.

func WithClientAPIKey

func WithClientAPIKey(key string) ClientOption

WithClientAPIKey sets the shared secret for authentication with the gateway.

func WithClientHTTPClient

func WithClientHTTPClient(c *http.Client) ClientOption

WithClientHTTPClient sets the HTTP client for the gateway connection.

func WithDescription

func WithDescription(desc string) ClientOption

WithDescription sets the subsystem description.

func WithGatewayAddr

func WithGatewayAddr(addr string) ClientOption

WithGatewayAddr sets the gateway server address to connect to.

func WithLocalEndpoint

func WithLocalEndpoint(endpoint string) ClientOption

WithLocalEndpoint sets the FlowWatch HTTP endpoint of this subsystem that the gateway will connect back to for proxying requests.

func WithSubsystemID

func WithSubsystemID(id string) ClientOption

WithSubsystemID sets the unique subsystem identifier.

func WithSubsystemName

func WithSubsystemName(name string) ClientOption

WithSubsystemName sets the human-readable subsystem name.

type HealthMonitor

type HealthMonitor struct {
	// contains filtered or unexported fields
}

HealthMonitor periodically checks subsystem connections and removes subsystems that have been stale beyond the grace period.

func NewHealthMonitor

func NewHealthMonitor(a *Adapter) *HealthMonitor

NewHealthMonitor creates a new health monitor. The check interval defaults to half the heartbeat interval for timely stale detection.

func (*HealthMonitor) Start

func (h *HealthMonitor) Start(ctx context.Context)

Start begins the background health check loop.

func (*HealthMonitor) Stop

func (h *HealthMonitor) Stop()

Stop waits for the health monitor to finish.

type Server

type Server struct {
	// contains filtered or unexported fields
}

Server implements the GatewayServiceHandler interface for subsystem registration via bidirectional streams.

func NewServer

func NewServer(adapter *Adapter) *Server

NewServer creates a new gateway registration server.

func (*Server) Register

Register handles a bidirectional registration stream from a subsystem.

type SubsystemInfo

type SubsystemInfo struct {
	SubsystemID   string
	SubsystemName string
	Description   string
	Endpoint      string
	Workflows     []*flowwatchv1.WorkflowDefinition
	Capabilities  *flowwatchv1.Capabilities
	Metadata      map[string]string
}

SubsystemInfo holds metadata about a registered subsystem.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL