server

package
v0.53.1-alpha.50 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: May 8, 2026 License: BSD-3-Clause Imports: 26 Imported by: 0

Documentation

Index

Constants

This section is empty.

Variables

View Source
var ErrLocalPeerGone = errors.New("relay: local peer disappeared between HasPeer and dispatch")

ErrLocalPeerGone is returned by LocalPeerDispatcher when the peer disappeared between the cluster forwarder's HasPeer check and the dispatch — the asking pod will time out and re-broadcast on the next Lookup, same as today's race-recovery path.

Functions

This section is empty.

Types

type ClusterBootstrap

type ClusterBootstrap struct {
	Transport *cluster.Transport
	Locator   *cluster.PeerLocator
	Forwarder *cluster.Forwarder
	Discovery *cluster.Discovery
	Metrics   *cluster.Metrics
}

ClusterBootstrap holds the long-lived inter-pod components a relay needs in multi-pod mode (per ADR-0014). The fields are exported so operators / tests can inspect or instrument them, but the lifetime is owned by the bootstrap: call Stop to tear everything down in the right order.

func StartCluster

func StartCluster(ctx context.Context, st *store.Store, cfg ClusterBootstrapConfig) (*ClusterBootstrap, error)

StartCluster wires the cluster transport, locator, forwarder and discovery loop, returning the assembled bootstrap. The caller must hand cb.Forwarder to Server.SetCrossPodForwarder before any peer connections are accepted, so that local-store misses fall through to the cluster fabric.

On any wiring failure StartCluster cleans up partially-built state (transport listener, etc.) before returning the error.

func (*ClusterBootstrap) Stop

func (cb *ClusterBootstrap) Stop()

Stop tears down the cluster machinery in reverse construction order: stop discovery (no new outbound dials), then stop the transport (which closes every live stream and the listener), then unregister the metrics gauge callback. Safe on a nil receiver and idempotent.

type ClusterBootstrapConfig

type ClusterBootstrapConfig struct {
	// Headless is the FQDN (or in-cluster short name) of the K8s
	// Headless Service that resolves to every relay pod's IP.
	// Example: "openzro-relay-internal".
	Headless string

	// Port is the inter-pod TCP port. Same value on every pod.
	// Defaults to cluster.DefaultInterpodPort when zero.
	Port int

	// PodIP is the address other pods will dial us on, learned via
	// the K8s downward API (POD_IP). Required — without it we'd
	// announce a useless 0.0.0.0 in our HELLO frame and sibling
	// pods could never establish back-streams.
	PodIP string

	// AuthSecret authenticates the inter-pod HELLO via HMAC-SHA256.
	// Same value on every relay pod. Empty means unsigned HELLO —
	// only safe behind a NetworkPolicy that limits the cluster
	// port to relay pods. Logs a loud warning when empty.
	AuthSecret string

	// Meter, when non-nil, drives the relay_cluster_* metric set
	// (forwards by outcome, lookup latency, HELLO rejects by
	// reason, live stream count). Pass the same meter the rest of
	// the relay uses; nil keeps the cluster components silent.
	Meter metric.Meter

	// Interval is the discovery reconcile period. Defaults to
	// cluster.DefaultDiscoveryInterval (10 s) when zero.
	Interval time.Duration
}

ClusterBootstrapConfig is what the relay command needs to know about its K8s deployment to wire the inter-pod fabric. Headless, Port and PodIP are required; everything else has sensible defaults.

type Config

type Config struct {
	Meter          metric.Meter
	ExposedAddress string
	TLSSupport     bool
	AuthValidator  Validator

	// CrossPodForwarder, when non-nil, hands transport messages
	// destined to peers not connected to this pod to the cluster
	// fabric (per ADR-0014). Leave nil for single-pod deployments
	// — the relay then drops on local-store miss, same as before.
	CrossPodForwarder CrossPodForwarder
	// contains filtered or unexported fields
}

type CrossPodForwarder

type CrossPodForwarder interface {
	Forward(ctx context.Context, dst messages.PeerID, msg []byte) error

	// Locate answers "is this peer connected anywhere in the
	// cluster?" without sending data. Used by the subscribe path
	// (handleSubscribePeerState) so a peer on pod-A can discover
	// that the peer it wants to talk to is on pod-B and proceed to
	// open a relay connection, instead of silently timing out as
	// "peer not available". Returns ok=true when some pod in the
	// fabric responded with I_HAVE for the peer, ok=false on
	// timeout / no-owner.
	Locate(ctx context.Context, peer messages.PeerID) (pod string, ok bool)
}

CrossPodForwarder is the seam where the cluster transport plugs in (per ADR-0014). When a transport-msg arrives for a peer this pod doesn't own, the relay hands the bytes to the forwarder; the forwarder either delivers across the inter-pod fabric or returns an error indicating the peer is nowhere in the cluster.

In single-pod deployments this stays nil and the relay falls back to its legacy "drop on local-store miss" behaviour, byte-for-byte.

type ListenerConfig

type ListenerConfig struct {
	Address   string
	TLSConfig *tls.Config
}

ListenerConfig is the configuration for the listener. Address: the address to bind the listener to. It could be an address behind a reverse proxy. TLSConfig: the TLS configuration for the listener.

type LocalPeerDispatcher

type LocalPeerDispatcher struct {
	// contains filtered or unexported fields
}

LocalPeerDispatcher adapts the relay store to the cluster-side "local pod surface" contract. The cluster forwarder calls into this when:

  • a Lookup resolved to *this* pod (e.g. the asker hadn't seen the I_HAVE yet but local cache says we own it), or
  • a sibling pod sent us an MsgFwd whose dst peer ID is connected here.

In both cases we just write the transport-msg bytes to the local peer's TCP connection. The src field of the transport-msg has already been set by the originating pod, so the wire format the local peer sees is identical to a same-pod relay.

func NewLocalPeerDispatcher

func NewLocalPeerDispatcher(s *store.Store) *LocalPeerDispatcher

NewLocalPeerDispatcher wires the cluster's LocalDispatcher contract against the existing relay store.

func (*LocalPeerDispatcher) DispatchToLocal

func (d *LocalPeerDispatcher) DispatchToLocal(dst messages.PeerID, msg []byte) error

DispatchToLocal writes the transport message bytes to the peer identified by dst. Returns ErrLocalPeerGone if the peer disappeared between HasPeer and dispatch — the cluster forwarder turns that into a re-broadcast on the next Lookup.

func (*LocalPeerDispatcher) HasPeer

func (d *LocalPeerDispatcher) HasPeer(p messages.PeerID) bool

HasPeer reports whether the peer is connected to this pod.

type Peer

type Peer struct {
	// contains filtered or unexported fields
}

Peer represents a peer connection

func NewPeer

func NewPeer(metrics *metrics.Metrics, id messages.PeerID, conn net.Conn, store *store.Store, notifier *store.PeerNotifier, crossPodFwd CrossPodForwarder) *Peer

NewPeer creates a new Peer instance and prepare custom logging. crossPodFwd is optional: pass nil for single-pod deployments to keep the legacy "drop on local-store miss" path.

func (*Peer) Close

func (p *Peer) Close()

func (*Peer) CloseGracefully

func (p *Peer) CloseGracefully(ctx context.Context)

CloseGracefully closes the connection with the peer gracefully. Send a close message to the client and close the connection.

func (*Peer) ID

func (p *Peer) ID() messages.PeerID

func (*Peer) String

func (p *Peer) String() string

String returns the peer ID

func (*Peer) Work

func (p *Peer) Work()

Work reads data from the connection It manages the protocol (healthcheck, transport, close). Read the message and determine the message type and handle the message accordingly.

func (*Peer) Write

func (p *Peer) Write(b []byte) (int, error)

Write writes data to the connection

type Relay

type Relay struct {
	// contains filtered or unexported fields
}

Relay represents the relay server

func NewRelay

func NewRelay(config Config) (*Relay, error)

NewRelay creates and returns a new Relay instance.

Parameters:

config: A Config struct that holds the configuration needed to initialize the relay server.
  - Meter: A metric.Meter used for emitting metrics. If not set, a default no-op meter will be used.
  - ExposedAddress: The external address clients use to reach this relay. Required.
  - TLSSupport: A boolean indicating if the relay uses TLS. Affects the generated instance URL.
  - AuthValidator: A Validator implementation used to authenticate peers. Required.

Returns:

A pointer to a Relay instance and an error. If initialization is successful, the error will be nil;
otherwise, it will contain the reason the relay could not be created (e.g., invalid configuration).

func (*Relay) Accept

func (r *Relay) Accept(conn net.Conn)

Accept start to handle a new peer connection

func (*Relay) InstanceURL

func (r *Relay) InstanceURL() string

InstanceURL returns the instance URL of the relay server

func (*Relay) SetCrossPodForwarder

func (r *Relay) SetCrossPodForwarder(f CrossPodForwarder)

SetCrossPodForwarder installs a CrossPodForwarder for ADR-0014. Must be called before peer connections start arriving (i.e. before Listen). New peers accepted after this call route through the cluster fabric on local-store miss; existing peers (none, in the bootstrap-time use case) keep whatever forwarder was set when they were created.

func (*Relay) Shutdown

func (r *Relay) Shutdown(ctx context.Context)

Shutdown closes the relay server It closes the connection with all peers in gracefully and stops accepting new connections.

func (*Relay) Store

func (r *Relay) Store() *store.Store

Store exposes the relay's peer store so a caller wiring up the cluster fabric can hand it to NewLocalPeerDispatcher. There's no unrestricted mutation surface here — Store's API only lets callers look peers up; AddPeer/DeletePeer are driven by the relay itself.

type Server

type Server struct {
	// contains filtered or unexported fields
}

Server is the main entry point for the relay server. It is the gate between the WebSocket listener and the Relay server logic. In a new HTTP connection, the server will accept the connection and pass it to the Relay server via the Accept method.

func NewServer

func NewServer(config Config) (*Server, error)

NewServer creates and returns a new relay server instance.

Parameters:

config: A Config struct containing the necessary configuration:
  - Meter: An OpenTelemetry metric.Meter used for recording metrics. If nil, a default no-op meter is used.
  - ExposedAddress: The public address (in domain:port format) used as the server's instance URL. Required.
  - TLSSupport: A boolean indicating whether TLS is enabled for the server.
  - AuthValidator: A Validator used to authenticate peers. Required.

Returns:

A pointer to a Server instance and an error. If the configuration is valid and initialization succeeds,
the returned error will be nil. Otherwise, the error will describe the problem.

func (*Server) InstanceURL

func (r *Server) InstanceURL() string

InstanceURL returns the instance URL of the relay server.

func (*Server) Listen

func (r *Server) Listen(cfg ListenerConfig) error

Listen starts the relay server.

func (*Server) SetCrossPodForwarder

func (r *Server) SetCrossPodForwarder(f CrossPodForwarder)

SetCrossPodForwarder hands the relay a cluster-fabric forwarder. Wire this before calling Listen so all accepted peers route cross-pod fall-through traffic correctly.

func (*Server) Shutdown

func (r *Server) Shutdown(ctx context.Context) error

Shutdown stops the relay server. If there are active connections, they will be closed gracefully. In case of a context, the connections will be forcefully closed.

func (*Server) Store

func (r *Server) Store() *store.Store

Store exposes the underlying peer store so multi-pod setups can hand it to the cluster fabric (per ADR-0014). The Store itself only offers safe lookup APIs to outside callers.

type Validator

type Validator interface {
	Validate(any) error
	// Deprecated: Use Validate instead.
	ValidateHelloMsgType(any) error
}

Directories

Path Synopsis
Package cluster implements coordinated multi-pod relay support for K8s deployments, per ADR-0014.
Package cluster implements coordinated multi-pod relay support for K8s deployments, per ADR-0014.
ws

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL