Documentation
¶
Index ¶
Constants ¶
This section is empty.
Variables ¶
var ErrLocalPeerGone = errors.New("relay: local peer disappeared between HasPeer and dispatch")
ErrLocalPeerGone is returned by LocalPeerDispatcher when the peer disappeared between the cluster forwarder's HasPeer check and the dispatch — the asking pod will time out and re-broadcast on the next Lookup, same as today's race-recovery path.
Functions ¶
This section is empty.
Types ¶
type ClusterBootstrap ¶
type ClusterBootstrap struct {
Transport *cluster.Transport
Locator *cluster.PeerLocator
Forwarder *cluster.Forwarder
Discovery *cluster.Discovery
Metrics *cluster.Metrics
}
ClusterBootstrap holds the long-lived inter-pod components a relay needs in multi-pod mode (per ADR-0014). The fields are exported so operators / tests can inspect or instrument them, but the lifetime is owned by the bootstrap: call Stop to tear everything down in the right order.
func StartCluster ¶
func StartCluster(ctx context.Context, st *store.Store, cfg ClusterBootstrapConfig) (*ClusterBootstrap, error)
StartCluster wires the cluster transport, locator, forwarder and discovery loop, returning the assembled bootstrap. The caller must hand cb.Forwarder to Server.SetCrossPodForwarder before any peer connections are accepted, so that local-store misses fall through to the cluster fabric.
On any wiring failure StartCluster cleans up partially-built state (transport listener, etc.) before returning the error.
func (*ClusterBootstrap) Stop ¶
func (cb *ClusterBootstrap) Stop()
Stop tears down the cluster machinery in reverse construction order: stop discovery (no new outbound dials), then stop the transport (which closes every live stream and the listener), then unregister the metrics gauge callback. Safe on a nil receiver and idempotent.
type ClusterBootstrapConfig ¶
type ClusterBootstrapConfig struct {
// Headless is the FQDN (or in-cluster short name) of the K8s
// Headless Service that resolves to every relay pod's IP.
// Example: "openzro-relay-internal".
Headless string
// Port is the inter-pod TCP port. Same value on every pod.
// Defaults to cluster.DefaultInterpodPort when zero.
Port int
// PodIP is the address other pods will dial us on, learned via
// the K8s downward API (POD_IP). Required — without it we'd
// announce a useless 0.0.0.0 in our HELLO frame and sibling
// pods could never establish back-streams.
PodIP string
// AuthSecret authenticates the inter-pod HELLO via HMAC-SHA256.
// Same value on every relay pod. Empty means unsigned HELLO —
// only safe behind a NetworkPolicy that limits the cluster
// port to relay pods. Logs a loud warning when empty.
AuthSecret string
// Meter, when non-nil, drives the relay_cluster_* metric set
// (forwards by outcome, lookup latency, HELLO rejects by
// reason, live stream count). Pass the same meter the rest of
// the relay uses; nil keeps the cluster components silent.
Meter metric.Meter
// Interval is the discovery reconcile period. Defaults to
// cluster.DefaultDiscoveryInterval (10 s) when zero.
Interval time.Duration
}
ClusterBootstrapConfig is what the relay command needs to know about its K8s deployment to wire the inter-pod fabric. Headless, Port and PodIP are required; everything else has sensible defaults.
type Config ¶
type Config struct {
Meter metric.Meter
ExposedAddress string
TLSSupport bool
AuthValidator Validator
// CrossPodForwarder, when non-nil, hands transport messages
// destined to peers not connected to this pod to the cluster
// fabric (per ADR-0014). Leave nil for single-pod deployments
// — the relay then drops on local-store miss, same as before.
CrossPodForwarder CrossPodForwarder
// contains filtered or unexported fields
}
type CrossPodForwarder ¶
type CrossPodForwarder interface {
Forward(ctx context.Context, dst messages.PeerID, msg []byte) error
// Locate answers "is this peer connected anywhere in the
// cluster?" without sending data. Used by the subscribe path
// (handleSubscribePeerState) so a peer on pod-A can discover
// that the peer it wants to talk to is on pod-B and proceed to
// open a relay connection, instead of silently timing out as
// "peer not available". Returns ok=true when some pod in the
// fabric responded with I_HAVE for the peer, ok=false on
// timeout / no-owner.
Locate(ctx context.Context, peer messages.PeerID) (pod string, ok bool)
}
CrossPodForwarder is the seam where the cluster transport plugs in (per ADR-0014). When a transport-msg arrives for a peer this pod doesn't own, the relay hands the bytes to the forwarder; the forwarder either delivers across the inter-pod fabric or returns an error indicating the peer is nowhere in the cluster.
In single-pod deployments this stays nil and the relay falls back to its legacy "drop on local-store miss" behaviour, byte-for-byte.
type ListenerConfig ¶
ListenerConfig is the configuration for the listener. Address: the address to bind the listener to. It could be an address behind a reverse proxy. TLSConfig: the TLS configuration for the listener.
type LocalPeerDispatcher ¶
type LocalPeerDispatcher struct {
// contains filtered or unexported fields
}
LocalPeerDispatcher adapts the relay store to the cluster-side "local pod surface" contract. The cluster forwarder calls into this when:
- a Lookup resolved to *this* pod (e.g. the asker hadn't seen the I_HAVE yet but local cache says we own it), or
- a sibling pod sent us an MsgFwd whose dst peer ID is connected here.
In both cases we just write the transport-msg bytes to the local peer's TCP connection. The src field of the transport-msg has already been set by the originating pod, so the wire format the local peer sees is identical to a same-pod relay.
func NewLocalPeerDispatcher ¶
func NewLocalPeerDispatcher(s *store.Store) *LocalPeerDispatcher
NewLocalPeerDispatcher wires the cluster's LocalDispatcher contract against the existing relay store.
func (*LocalPeerDispatcher) DispatchToLocal ¶
func (d *LocalPeerDispatcher) DispatchToLocal(dst messages.PeerID, msg []byte) error
DispatchToLocal writes the transport message bytes to the peer identified by dst. Returns ErrLocalPeerGone if the peer disappeared between HasPeer and dispatch — the cluster forwarder turns that into a re-broadcast on the next Lookup.
type Peer ¶
type Peer struct {
// contains filtered or unexported fields
}
Peer represents a peer connection
func NewPeer ¶
func NewPeer(metrics *metrics.Metrics, id messages.PeerID, conn net.Conn, store *store.Store, notifier *store.PeerNotifier, crossPodFwd CrossPodForwarder) *Peer
NewPeer creates a new Peer instance and prepare custom logging. crossPodFwd is optional: pass nil for single-pod deployments to keep the legacy "drop on local-store miss" path.
func (*Peer) CloseGracefully ¶
CloseGracefully closes the connection with the peer gracefully. Send a close message to the client and close the connection.
type Relay ¶
type Relay struct {
// contains filtered or unexported fields
}
Relay represents the relay server
func NewRelay ¶
NewRelay creates and returns a new Relay instance.
Parameters:
config: A Config struct that holds the configuration needed to initialize the relay server. - Meter: A metric.Meter used for emitting metrics. If not set, a default no-op meter will be used. - ExposedAddress: The external address clients use to reach this relay. Required. - TLSSupport: A boolean indicating if the relay uses TLS. Affects the generated instance URL. - AuthValidator: A Validator implementation used to authenticate peers. Required.
Returns:
A pointer to a Relay instance and an error. If initialization is successful, the error will be nil; otherwise, it will contain the reason the relay could not be created (e.g., invalid configuration).
func (*Relay) InstanceURL ¶
InstanceURL returns the instance URL of the relay server
func (*Relay) SetCrossPodForwarder ¶
func (r *Relay) SetCrossPodForwarder(f CrossPodForwarder)
SetCrossPodForwarder installs a CrossPodForwarder for ADR-0014. Must be called before peer connections start arriving (i.e. before Listen). New peers accepted after this call route through the cluster fabric on local-store miss; existing peers (none, in the bootstrap-time use case) keep whatever forwarder was set when they were created.
func (*Relay) Shutdown ¶
Shutdown closes the relay server It closes the connection with all peers in gracefully and stops accepting new connections.
type Server ¶
type Server struct {
// contains filtered or unexported fields
}
Server is the main entry point for the relay server. It is the gate between the WebSocket listener and the Relay server logic. In a new HTTP connection, the server will accept the connection and pass it to the Relay server via the Accept method.
func NewServer ¶
NewServer creates and returns a new relay server instance.
Parameters:
config: A Config struct containing the necessary configuration: - Meter: An OpenTelemetry metric.Meter used for recording metrics. If nil, a default no-op meter is used. - ExposedAddress: The public address (in domain:port format) used as the server's instance URL. Required. - TLSSupport: A boolean indicating whether TLS is enabled for the server. - AuthValidator: A Validator used to authenticate peers. Required.
Returns:
A pointer to a Server instance and an error. If the configuration is valid and initialization succeeds, the returned error will be nil. Otherwise, the error will describe the problem.
func (*Server) InstanceURL ¶
InstanceURL returns the instance URL of the relay server.
func (*Server) Listen ¶
func (r *Server) Listen(cfg ListenerConfig) error
Listen starts the relay server.
func (*Server) SetCrossPodForwarder ¶
func (r *Server) SetCrossPodForwarder(f CrossPodForwarder)
SetCrossPodForwarder hands the relay a cluster-fabric forwarder. Wire this before calling Listen so all accepted peers route cross-pod fall-through traffic correctly.