peermanagement

package
v0.0.0-...-2b6a5f4 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: May 15, 2026 License: ISC Imports: 43 Imported by: 0

Documentation

Overview

Package peermanagement implements XRPL peer-to-peer networking.

Index

Constants

View Source
const (
	DefaultListenAddr  = ":51235"
	DefaultMaxPeers    = 50
	DefaultMaxInbound  = 25
	DefaultMaxOutbound = 25

	DefaultConnectTimeout   = 10 * time.Second
	DefaultHandshakeTimeout = 5 * time.Second
	DefaultPingInterval     = 30 * time.Second
	DefaultIdleTimeout      = 2 * time.Minute

	DefaultEventBufferSize   = 256
	DefaultMessageBufferSize = 256
	DefaultSendBufferSize    = 64

	DefaultUserAgent = "goXRPL/0.1.0"
)

Default configuration values.

View Source
const (
	DefaultBootCacheFile   = "peerfinder.cache"
	MaxCachedEndpoints     = 1000
	CacheEntryTTL          = 7 * 24 * time.Hour
	RecentEndpointTTL      = 5 * time.Minute
	MaxHops                = 3
	DefaultReservationFile = "peer_reservations.json"
)

Discovery constants.

View Source
const (
	HeaderUpgrade          = "Upgrade"
	HeaderConnection       = "Connection"
	HeaderConnectAs        = "Connect-As"
	HeaderPublicKey        = "Public-Key"
	HeaderSessionSignature = "Session-Signature"
	HeaderNetworkID        = "Network-ID"
	HeaderNetworkTime      = "Network-Time"
	HeaderClosedLedger     = "Closed-Ledger"
	HeaderPreviousLedger   = "Previous-Ledger"
	HeaderCrawl            = "Crawl"
	HeaderUserAgent        = "User-Agent"
	HeaderInstanceCookie   = "Instance-Cookie"
	HeaderServerDomain     = "Server-Domain"
	HeaderRemoteIP         = "Remote-IP"
	HeaderLocalIP          = "Local-IP"
	HeaderServer           = "Server"
)
View Source
const (
	HeaderProtocolCtl = "X-Protocol-Ctl"

	FeatureNameCompr        = "compr"
	FeatureNameVPRR         = "vprr"
	FeatureNameTXRR         = "txrr"
	FeatureNameLedgerReplay = "ledgerreplay"

	FeatureDelimiter = ";"
	ValueDelimiter   = ","
)

X-Protocol-Ctl: feature1=v1,v2;feature2=v3

View Source
const (
	NodePublicKeyPrefix    = 0x1C // base58 'n' prefix
	AccountPublicKeyPrefix = 0x23 // base58 'a' prefix
	CompressedPubKeyLen    = 33
	ChecksumLen            = 4
)
View Source
const (
	// DefaultRequestTimeout is the default timeout for ledger data requests.
	DefaultRequestTimeout = 30 * time.Second

	// MaxConcurrentRequests is the maximum number of concurrent requests per peer.
	MaxConcurrentRequests = 5

	// MaxReplayDeltaResponseBytes caps the total uncompressed payload size of
	// a single mtREPLAY_DELTA_RESPONSE we will emit. Rippled does not enforce
	// an upstream cap, but our framing layer enforces its own limit
	// (message.MaxMessageSize = 64 MiB) and any response above that boundary
	// would be dropped at the codec layer. A 16 MiB ceiling leaves comfortable
	// headroom for the wire envelope and protects the event channel from
	// arbitrarily large allocations driven by remote requests.
	MaxReplayDeltaResponseBytes = 16 * 1024 * 1024
)

Ledger sync constants.

View Source
const (
	TypeUnknown                 = message.TypeUnknown
	TypeManifests               = message.TypeManifests
	TypePing                    = message.TypePing
	TypeCluster                 = message.TypeCluster
	TypeEndpoints               = message.TypeEndpoints
	TypeTransaction             = message.TypeTransaction
	TypeGetLedger               = message.TypeGetLedger
	TypeLedgerData              = message.TypeLedgerData
	TypeProposeLedger           = message.TypeProposeLedger
	TypeStatusChange            = message.TypeStatusChange
	TypeHaveSet                 = message.TypeHaveSet
	TypeValidation              = message.TypeValidation
	TypeGetObjects              = message.TypeGetObjects
	TypeValidatorList           = message.TypeValidatorList
	TypeSquelch                 = message.TypeSquelch
	TypeValidatorListCollection = message.TypeValidatorListCollection
	TypeProofPathReq            = message.TypeProofPathReq
	TypeProofPathResponse       = message.TypeProofPathResponse
	TypeReplayDeltaReq          = message.TypeReplayDeltaReq
	TypeReplayDeltaResponse     = message.TypeReplayDeltaResponse
	TypeHaveTransactions        = message.TypeHaveTransactions
	TypeTransactions            = message.TypeTransactions
)

Re-export message type constants.

View Source
const (
	AlgorithmNone = message.AlgorithmNone
	AlgorithmLZ4  = message.AlgorithmLZ4
)

Re-export compression algorithm constants.

View Source
const (
	HeaderSizeUncompressed = message.HeaderSizeUncompressed
	HeaderSizeCompressed   = message.HeaderSizeCompressed
	MaxMessageSize         = message.MaxMessageSize
)

Re-export header size constants.

View Source
const (
	// MinUnsquelchExpire is the minimum squelch duration (5 minutes).
	MinUnsquelchExpire = 300 * time.Second

	// MaxUnsquelchExpireDefault is the default upper bound for the squelch
	// duration when there are few peers (10 minutes).
	MaxUnsquelchExpireDefault = 600 * time.Second

	// SquelchPerPeer is the per-peer contribution to the upper squelch bound.
	SquelchPerPeer = 10 * time.Second

	// MaxUnsquelchExpirePeers is the absolute upper bound for any squelch
	// duration (1 hour).
	MaxUnsquelchExpirePeers = 3600 * time.Second

	// Idled is the no-message-received threshold before a peer is treated
	// as idle for selection purposes.
	Idled = 8 * time.Second

	// MinMessageThreshold is the per-peer message count needed before a
	// peer becomes a selection candidate.
	MinMessageThreshold = 19

	// MaxMessageThreshold is the per-peer message count that, when reached
	// by MaxSelectedPeers peers, triggers selection.
	MaxMessageThreshold = 20

	// MaxSelectedPeers is the maximum number of peers chosen as the source
	// of validator messages per slot.
	MaxSelectedPeers = 5

	// WaitOnBootup is the grace period after start-up before reduce-relay
	// engages, to let the node establish its peer connections.
	WaitOnBootup = 10 * time.Minute

	// MaxTxQueueSize caps the aggregated transaction-hash queue per peer
	// so a TMTransactions message stays within the 64MB protocol limit.
	MaxTxQueueSize = 10000
)

Reduce-relay constants. These mirror rippled's `src/xrpld/overlay/ReduceRelayCommon.h` (namespace `reduce_relay`).

A peer's squelch is bounded in time to:

rand{MinUnsquelchExpire, max_squelch}

where:

max_squelch = min(max(MaxUnsquelchExpireDefault, SquelchPerPeer * num_peers),
                  MaxUnsquelchExpirePeers)

See: https://xrpl.org/blog/2021/message-routing-optimizations-pt-1-proposal-validation-relaying.html

View Source
const EvictBadDataThreshold = 25000

EvictBadDataThreshold is the bad-data BALANCE at which the overlay disconnects a peer. IncBadData adds a per-reason weight (see BadDataWeight) and a background decay halves the balance every badDataDecayInterval — together this approximates rippled's Resource::Consumer model: persistent offenders accumulate faster than decay and evict; chatty-but-honest peers decay below threshold. 25000 matches rippled's dropThreshold constant in src/xrpld/overlay/Resource/impl/Tuning.h:30-40 (NOT its much lower minGossipBalance — those are distinct gates). Calibration: a peer sending one genuinely-corrupt message per decay window (weightInvalidData=400) asymptotically approaches 800 (below the 25000 threshold), so sporadic offenders survive; sustained multi-hundred-per-window abuse crosses the threshold within seconds.

View Source
const FeatureReduceRelay = FeatureVpReduceRelay

FeatureReduceRelay is a legacy alias for FeatureVpReduceRelay.

View Source
const (
	// MinCompressibleSize is the minimum message size worth compressing.
	MinCompressibleSize = 70
)

Compression constants.

View Source
const NetworkClockTolerance = 20 * time.Second
View Source
const RelayedIndexMaxEntries = 4096

RelayedIndexMaxEntries caps memory for the reverse index under adversarial traffic. Sized to match the adaptor's dedup cap so both age out together under sustained churn.

View Source
const RelayedIndexTTL = 30 * time.Second

RelayedIndexTTL bounds how long a suppression-key → peers entry is kept in the reverse index. Must match the consensus router's messageDedupTTL so that a hash remains queryable for as long as the router may observe duplicates for it. If the index expired before the dedup window, a duplicate hitting router.handleProposal could find no "peers that have the message" entry and under-feed the slot — the exact bug B3 was filed to fix.

Variables

View Source
var (
	// Connection errors
	ErrMaxPeersReached    = errors.New("maximum peers reached")
	ErrMaxInboundReached  = errors.New("maximum inbound connections reached")
	ErrMaxOutboundReached = errors.New("maximum outbound connections reached")
	ErrAlreadyConnected   = errors.New("already connected to peer")
	ErrSelfConnection     = errors.New("cannot connect to self")
	ErrSlotUnavailable    = errors.New("no connection slot available")
	ErrConnectionClosed   = errors.New("connection closed")
	ErrSendBufferFull     = errors.New("peer send buffer full")
	ErrPingTimeout        = errors.New("peer ping timeout")
	ErrLargeSendQueue     = errors.New("peer send queue saturated; closing")

	// Handshake errors
	ErrHandshakeFailed  = errors.New("handshake failed")
	ErrInvalidHandshake = errors.New("invalid handshake data")
	ErrHandshakeTimeout = errors.New("handshake timeout")
	ErrProtocolMismatch = errors.New("protocol version mismatch")
	ErrInvalidPublicKey = errors.New("invalid public key")
	ErrInvalidSignature = errors.New("invalid signature")
	ErrNetworkMismatch  = errors.New("network ID mismatch")

	// Discovery errors
	ErrPeerNotFound    = errors.New("peer not found")
	ErrInvalidEndpoint = errors.New("invalid endpoint")
	ErrEndpointBanned  = errors.New("endpoint is banned")

	// Message errors
	ErrInvalidMessage   = errors.New("invalid message")
	ErrMessageTooLarge  = errors.New("message too large")
	ErrUnknownMessage   = errors.New("unknown message type")
	ErrDecodeFailed     = errors.New("failed to decode message")
	ErrEncodeFailed     = errors.New("failed to encode message")
	ErrDecompressFailed = errors.New("failed to decompress message")

	// Lifecycle errors
	ErrNotRunning = errors.New("overlay not running")
	ErrShutdown   = errors.New("overlay is shutting down")
)

Sentinel errors for peer management operations.

View Source
var (
	// ErrLedgerNotFound signals the requested ledger is unknown to the
	// provider or not yet immutable. The handler responds with
	// ReplyErrorNoLedger.
	ErrLedgerNotFound = errors.New("ledger not found")
	// ErrKeyNotFound signals the ledger exists but the requested key has
	// no leaf in the selected map. The handler responds with
	// ReplyErrorNoNode.
	ErrKeyNotFound = errors.New("key not found in ledger map")
	// ErrPeerBadRequest is returned by LedgerSyncHandler.HandleMessage
	// when the inbound request was malformed (e.g. bad field lengths,
	// invalid enum values) and we replied with ReplyErrorBadRequest. The
	// overlay dispatcher uses it to attribute the failure to the
	// originating peer via IncPeerBadData. Mirrors rippled's
	// fee.update(feeInvalidData) path for reBAD_REQUEST replies.
	ErrPeerBadRequest = errors.New("peer sent bad request")
)

Sentinel errors returned by LedgerProvider.GetProofPath so the handler can map them back to the appropriate TMReplyError code on the wire.

Mirrors rippled's reNO_LEDGER (ledger unknown / not yet immutable) and reNO_NODE (the requested key is not present in the selected map) at rippled/src/xrpld/app/ledger/detail/LedgerReplayMsgHandler.cpp:62-90.

View Source
var (
	ErrInvalidPrivateKey = errors.New("invalid private key")
)

Functions

func BadDataWeight

func BadDataWeight(reason string) int

BadDataWeight returns the weight to charge IncBadData for `reason`. Maps reason labels to rippled's fee tiers. Unrecognized reasons fall back to weightDefaultBadData so a new bad-data source doesn't accidentally ship with zero weight.

Classification rationale per rippled PeerImp.cpp:

  • sig-size / pubkey-size → feeInvalidSignature (PeerImp.cpp:1683-1686)
  • ledger-hash / txset / prev-ledger-size / node-id-zero → feeMalformedRequest (PeerImp.cpp:1693: "bad hashes")
  • verify / decode / wire-corruption → feeInvalidData
  • no-reply → feeRequestNoReply

func BuildHandshakeErrorResponse

func BuildHandshakeErrorResponse(userAgent, remoteAddr, text string) *http.Response

BuildHandshakeErrorResponse mirrors rippled OverlayImpl::makeErrorResponse (OverlayImpl.cpp:371-386). rippled returns 400 Bad Request — not 426 Upgrade Required — with the failure reason embedded in the status line as "Bad Request (<text>)" so a misconfigured peer can read why the upgrade was refused before the connection is closed.

func BuildHandshakeRequest

func BuildHandshakeRequest(id *Identity, sharedValue []byte, cfg HandshakeConfig) (*http.Request, error)

BuildHandshakeRequest builds an HTTP upgrade request for peer connection.

func BuildHandshakeResponse

func BuildHandshakeResponse(id *Identity, sharedValue []byte, cfg HandshakeConfig, negotiated string) *http.Response

BuildHandshakeResponse mirrors rippled makeResponse (Handshake.cpp:391-422). `negotiated` is the version returned by NegotiateProtocolVersion against the inbound request; an empty value falls back to the highest supported version (test convenience).

func CompressIfWorthwhile

func CompressIfWorthwhile(msgType uint16, data []byte) ([]byte, bool)

CompressIfWorthwhile compresses data if it would be beneficial.

func CompressLZ4

func CompressLZ4(data []byte) ([]byte, error)

CompressLZ4 compresses data using LZ4. Returns the compressed data or nil if compression wouldn't save space.

func DecompressLZ4

func DecompressLZ4(compressed []byte, uncompressedSize int) ([]byte, error)

DecompressLZ4 decompresses LZ4 compressed data.

func EncodeMessage

func EncodeMessage(msg Message) ([]byte, error)

EncodeMessage encodes a message to bytes using protobuf.

func EncodeMessageHeader

func EncodeMessageHeader(buf []byte, payloadSize uint32, msgType MessageType, algorithm CompressionAlgorithm, uncompressedSize uint32) error

EncodeMessageHeader encodes a message header into the provided buffer.

func FeatureEnabled

func FeatureEnabled(headers http.Header, feature string) bool

func GenerateSeed

func GenerateSeed() ([]byte, error)

func GetFeatureValue

func GetFeatureValue(headers http.Header, feature string) (string, bool)

func IsFeatureValue

func IsFeatureValue(headers http.Header, feature, value string) bool

IsFeatureValue reports whether feature's comma-separated value list contains value.

func MakeFeaturesRequestHeader

func MakeFeaturesRequestHeader(comprEnabled, ledgerReplayEnabled, txReduceRelayEnabled, vpReduceRelayEnabled bool) string

MakeFeaturesRequestHeader builds the X-Protocol-Ctl value for a request.

func MakeFeaturesResponseHeader

func MakeFeaturesResponseHeader(requestHeaders http.Header, comprEnabled, ledgerReplayEnabled, txReduceRelayEnabled, vpReduceRelayEnabled bool) string

MakeFeaturesResponseHeader echoes back only features that are both locally enabled AND requested by the peer.

func NegotiateProtocolVersion

func NegotiateProtocolVersion(upgradeHeader string) string

NegotiateProtocolVersion picks the largest version in the intersection of the peer's offered Upgrade list and supportedProtocols, or "" if no shared version exists. Use on the INBOUND path where the request advertises a list. Mirrors rippled negotiateProtocolVersion (ProtocolVersion.cpp:127-156).

func PeerFeatureEnabled

func PeerFeatureEnabled(headers http.Header, feature, value string, localEnabled bool) bool

func ShouldCompress

func ShouldCompress(msgType uint16) bool

ShouldCompress returns true if the message type should be compressed.

func SupportedProtocolVersions

func SupportedProtocolVersions() string

SupportedProtocolVersions returns the comma-joined Upgrade header value goXRPL advertises. Mirrors rippled supportedProtocolVersions() (ProtocolVersion.cpp:158-174).

func ValidateServerDomain

func ValidateServerDomain(headers http.Header) (string, error)

ValidateServerDomain enforces verifyHandshake's Server-Domain check (Handshake.cpp:235-239). Run first to match rippled's verify order (Server-Domain → Network-ID → Network-Time → Public-Key → ...).

func VerifyOutboundProtocolVersion

func VerifyOutboundProtocolVersion(upgradeHeader string) string

VerifyOutboundProtocolVersion accepts the server's Upgrade response only if it contains exactly one supported version, returning that version's token. Returns "" otherwise (zero, multiple, or unsupported). Mirrors rippled ConnectAttempt.cpp:340-351.

func WriteMessage

func WriteMessage(w io.Writer, msgType MessageType, payload []byte) error

WriteMessage writes a message with header to the writer.

func WriteMessageCompressed

func WriteMessageCompressed(w io.Writer, msgType MessageType, payload []byte, algorithm CompressionAlgorithm, uncompressedSize uint32) error

WriteMessageCompressed writes a potentially compressed message.

func WriteRawHandshakeRequest

func WriteRawHandshakeRequest(w io.Writer, req *http.Request) error

WriteRawHandshakeRequest writes the request without the extra headers (Host, Content-Length, ...) that http.Request.Write adds — rippled's parser rejects them.

Types

type BootCache

type BootCache struct {
	// contains filtered or unexported fields
}

BootCache persists known peer addresses across restarts.

func NewBootCache

func NewBootCache(dataDir string) *BootCache

NewBootCache creates a new boot cache.

func (*BootCache) GetEndpoints

func (bc *BootCache) GetEndpoints(limit int) []*CachedEndpoint

GetEndpoints returns endpoints sorted by valence.

func (*BootCache) Insert

func (bc *BootCache) Insert(address string, port uint16)

Insert adds or updates an endpoint in the cache.

func (*BootCache) Load

func (bc *BootCache) Load() error

Load loads the cache from disk.

func (*BootCache) MarkFailed

func (bc *BootCache) MarkFailed(address string)

MarkFailed records a connection failure.

func (*BootCache) MarkSuccess

func (bc *BootCache) MarkSuccess(address string)

MarkSuccess records a successful connection.

func (*BootCache) Save

func (bc *BootCache) Save() error

Save writes the cache to disk.

type CachedEndpoint

type CachedEndpoint struct {
	Address    string    `json:"address"`
	Port       uint16    `json:"port"`
	LastSeen   time.Time `json:"last_seen"`
	Valence    int       `json:"valence"`
	FailCount  int       `json:"fail_count"`
	LastFailed time.Time `json:"last_failed,omitempty"`
}

CachedEndpoint represents a cached peer endpoint.

type CompressionAlgorithm

type CompressionAlgorithm = message.CompressionAlgorithm

CompressionAlgorithm represents a compression algorithm.

type Config

type Config struct {
	// Network settings
	ListenAddr string
	NetworkID  uint32
	UserAgent  string

	// Peer limits
	MaxPeers    int
	MaxInbound  int
	MaxOutbound int

	// Bootstrap peers
	BootstrapPeers []string
	FixedPeers     []string

	// Privacy
	PrivateMode bool // Don't share our address with peers

	// Storage
	DataDir string // For boot cache persistence

	// Timeouts
	ConnectTimeout   time.Duration
	HandshakeTimeout time.Duration
	PingInterval     time.Duration
	IdleTimeout      time.Duration

	// Buffer sizes
	EventBufferSize   int
	MessageBufferSize int
	SendBufferSize    int

	// Features — advertised via X-Protocol-Ctl during handshake so
	// peers know which optional protocol extensions we speak. Matches
	// rippled's compr / vprr / txrr / ledgerreplay feature toggles.
	//
	// All three reduce-relay flags default to false to match rippled
	// (Config.h:248 sets VP_REDUCE_RELAY_BASE_SQUELCH_ENABLE=false,
	// Config.h:258 sets TX_REDUCE_RELAY_ENABLE=false, Config.cpp:755-762
	// preserves false when the cfg omits the section). Reduce-relay
	// is opt-in: an operator must explicitly set one of these flags
	// (or WithReduceRelay(true)) to advertise vprr/txrr and activate
	// the slot-squelching engine. Shipping with the flags on would
	// cause a stock goXRPL node to squelch traffic on a stock rippled
	// network where the peer majority does not reciprocate.
	//
	// EnableReduceRelay is a legacy alias that enables BOTH vprr and
	// txrr at once. New code should set EnableVPReduceRelay and
	// EnableTxReduceRelay independently so an operator can run one
	// without the other — matches rippled's Handshake.cpp handling of
	// the two features as independent toggles. When EnableReduceRelay
	// is set, it is propagated to both at Validate() time if either
	// specific toggle is still false.
	EnableReduceRelay   bool
	EnableVPReduceRelay bool
	EnableTxReduceRelay bool
	EnableCompression   bool
	EnableLedgerReplay  bool

	// LocalValidatorPubKey is the compressed secp256k1 public key (33
	// bytes) of the local validator identity, when this node is acting
	// as a validator. Nil/empty for observer nodes. Used by
	// handleSquelchMessage to drop inbound TMSquelch frames that target
	// our own validator — otherwise a hostile peer could silence our
	// own proposals/validations on the RelayFromValidator path.
	// Matches rippled PeerImp.cpp:2715-2721.
	LocalValidatorPubKey []byte

	// ClusterNodes lists base58-encoded node public keys (with an
	// optional trailing comment as the human-readable name) for peers
	// that should be treated as cluster members. Mirrors the
	// [cluster_nodes] section in rippled.cfg. Parsed by
	// cluster.Registry.Load at construction time; a malformed entry
	// fails Overlay startup, matching rippled Application init which
	// aborts when Cluster::load returns false.
	ClusterNodes []string

	// ServerDomain populates the Server-Domain header; "" suppresses it.
	ServerDomain string
	// PublicIP populates Local-IP and gates the Remote-IP consistency
	// check; nil suppresses both.
	PublicIP net.IP

	// Clock function for testing
	Clock func() time.Time
}

Config holds the configuration for the overlay network.

func DefaultConfig

func DefaultConfig() Config

DefaultConfig returns the default configuration.

func (*Config) Validate

func (c *Config) Validate() error

Validate checks the configuration for invalid values.

type DiscoveredPeer

type DiscoveredPeer struct {
	Address   string
	Hops      uint32
	LastSeen  time.Time
	Connected bool
	PeerID    PeerID
	Source    PeerID
}

DiscoveredPeer stores information about a discovered peer.

type Discovery

type Discovery struct {
	// contains filtered or unexported fields
}

Discovery manages peer discovery and connection maintenance.

func NewDiscovery

func NewDiscovery(cfg *Config, events chan<- Event) *Discovery

NewDiscovery creates a new Discovery instance.

func (*Discovery) AddPeer

func (d *Discovery) AddPeer(address string, hops uint32, source PeerID)

AddPeer adds a discovered peer.

func (*Discovery) ConnectedCount

func (d *Discovery) ConnectedCount() int

ConnectedCount returns the number of connected peers.

func (*Discovery) MarkConnected

func (d *Discovery) MarkConnected(address string, peerID PeerID)

MarkConnected marks a peer as connected.

func (*Discovery) MarkDisconnected

func (d *Discovery) MarkDisconnected(peerID PeerID)

MarkDisconnected marks a peer as disconnected.

func (*Discovery) NeedsMorePeers

func (d *Discovery) NeedsMorePeers() bool

NeedsMorePeers returns true if we should connect to more peers.

func (*Discovery) SelectPeersToConnect

func (d *Discovery) SelectPeersToConnect(count int) []string

SelectPeersToConnect returns candidate addresses to connect to.

func (*Discovery) Start

func (d *Discovery) Start(ctx context.Context) error

Start starts the discovery service.

func (*Discovery) Stop

func (d *Discovery) Stop()

Stop stops the discovery service.

type Endpoint

type Endpoint struct {
	Host string
	Port uint16
}

Endpoint represents a network address for a peer.

func ParseEndpoint

func ParseEndpoint(s string) (Endpoint, error)

ParseEndpoint parses an endpoint from "host:port" string.

func (Endpoint) String

func (e Endpoint) String() string

String returns the endpoint as "host:port".

type Event

type Event struct {
	// Type is the event type.
	Type EventType

	// PeerID is the peer this event relates to (if applicable).
	PeerID PeerID

	// Endpoint is the peer's endpoint (for connection events).
	Endpoint Endpoint

	// PublicKey is the peer's public key (after handshake).
	PublicKey []byte

	// MessageType is the type of message (for MessageReceived events).
	MessageType uint16

	// Payload is the message payload (for MessageReceived events).
	Payload []byte

	// Endpoints is a list of endpoints (for EndpointsReceived events).
	Endpoints []Endpoint

	// Inbound indicates if this is an inbound connection.
	Inbound bool

	// Error is set for failure events.
	Error error
}

Event represents a peer management event for internal coordination.

type EventType

type EventType int

EventType represents the type of peer management event.

const (
	// EventPeerConnecting is emitted when starting to connect to a peer.
	EventPeerConnecting EventType = iota

	// EventPeerConnected is emitted when TCP connection is established.
	EventPeerConnected

	// EventPeerHandshakeComplete is emitted when handshake succeeds.
	EventPeerHandshakeComplete

	// EventPeerActivated is emitted when peer becomes fully active.
	EventPeerActivated

	// EventPeerDisconnected is emitted when peer disconnects.
	EventPeerDisconnected

	// EventPeerFailed is emitted when connection attempt fails.
	EventPeerFailed

	// EventMessageReceived is emitted when a message is received from a peer.
	EventMessageReceived

	// EventEndpointsReceived is emitted when peer endpoints are received.
	EventEndpointsReceived

	// EventLedgerResponse is emitted when ledger data needs to be sent to a peer.
	EventLedgerResponse
)

func (EventType) String

func (e EventType) String() string

String returns the string representation of an EventType.

type Feature

type Feature int
const (
	FeatureValidatorListPropagation Feature = iota
	FeatureLedgerReplay
	FeatureCompression
	// vprr — validator-proposal reduce-relay (gates TMSquelch).
	FeatureVpReduceRelay
	// txrr — transaction reduce-relay. Independent of vprr.
	FeatureTxReduceRelay
	FeatureTransactionBatching
)

func ParseFeature

func ParseFeature(s string) (Feature, bool)

ParseFeature accepts the legacy "reduceRelay" alias plus vprr/txrr.

func (Feature) String

func (f Feature) String() string

type FeatureSet

type FeatureSet struct {
	// contains filtered or unexported fields
}

FeatureSet represents a set of supported features.

func DefaultFeatureSet

func DefaultFeatureSet() *FeatureSet

func NewFeatureSet

func NewFeatureSet() *FeatureSet

func ParseProtocolCtlFeatures

func ParseProtocolCtlFeatures(headers http.Header) *FeatureSet

ParseProtocolCtlFeatures decodes the negotiated capabilities. txrr and vprr are tracked independently — they gate different behaviour (tx relay vs TMSquelch) and operators can enable one without the other.

func (*FeatureSet) Disable

func (fs *FeatureSet) Disable(f Feature)

func (*FeatureSet) Enable

func (fs *FeatureSet) Enable(f Feature)

func (*FeatureSet) Has

func (fs *FeatureSet) Has(f Feature) bool

func (*FeatureSet) Intersect

func (fs *FeatureSet) Intersect(other *FeatureSet) *FeatureSet

func (*FeatureSet) List

func (fs *FeatureSet) List() []Feature

type HandshakeConfig

type HandshakeConfig struct {
	UserAgent   string
	NetworkID   uint32
	CrawlPublic bool

	// X-Protocol-Ctl advertisements. Peers gate feature-specific
	// messages on these flags in both directions.
	EnableLedgerReplay  bool
	EnableCompression   bool
	EnableVPReduceRelay bool
	EnableTxReduceRelay bool

	InstanceCookie     uint64
	ServerDomain       string
	PublicIP           net.IP // nil disables Local-IP emission and Remote-IP check
	LedgerHintProvider func() (hints LedgerHints, ok bool)
}

func DefaultHandshakeConfig

func DefaultHandshakeConfig() HandshakeConfig

type HandshakeError

type HandshakeError struct {
	Endpoint Endpoint
	Stage    string
	Err      error
}

HandshakeError provides detailed handshake failure information.

func NewHandshakeError

func NewHandshakeError(endpoint Endpoint, stage string, err error) *HandshakeError

NewHandshakeError creates a new HandshakeError.

func (*HandshakeError) Error

func (e *HandshakeError) Error() string

Error returns the error message.

func (*HandshakeError) Unwrap

func (e *HandshakeError) Unwrap() error

Unwrap returns the underlying error.

type HandshakeExtras

type HandshakeExtras struct {
	ServerDomain      string
	NetworkID         string
	ClosedLedger      [32]byte
	PreviousLedger    [32]byte
	HasClosedLedger   bool
	HasPreviousLedger bool
	// Raw version headers; applyHandshakeExtras picks one by direction,
	// mirroring PeerImp::getVersion (PeerImp.cpp:381-386).
	UserAgentHeader string
	ServerHeader    string
}

HandshakeExtras carries the typed headers ParseHandshakeExtras surfaces. Instance-Cookie / Local-IP / Remote-IP round-trip on the wire but are validated-and-discarded (matching rippled PeerImp).

func ParseHandshakeExtras

func ParseHandshakeExtras(
	headers http.Header,
	localPublicIP net.IP,
	peerRemote net.IP,
) (HandshakeExtras, error)

ParseHandshakeExtras enforces the post-signature checks: ledger-hash malformed (PeerImp.cpp:175-191), Previous-without-Closed (PeerImp.cpp:193-194), Local-IP / Remote-IP consistency (Handshake.cpp:325-359). Server-Domain is validated separately by ValidateServerDomain (which must run first to match rippled's order). Instance-Cookie is emitted on the wire but never parsed (rippled's verifyHandshake doesn't inspect it). peerRemote == nil disables the IP comparisons.

type Identity

type Identity struct {
	// contains filtered or unexported fields
}

Identity is the node's cryptographic identity used for peer authentication. The TLS keypair is generated lazily and cached — RSA-2048 keygen is 50–200 ms and we'd otherwise pay it on every dial.

func GenerateIdentity

func GenerateIdentity() (*Identity, error)

GenerateIdentity is an alias for NewIdentity.

func LoadIdentity

func LoadIdentity(dataDir string) (*Identity, error)

func NewIdentity

func NewIdentity() (*Identity, error)

func NewIdentityFromPrivateKey

func NewIdentityFromPrivateKey(privKeyHex string) (*Identity, error)

func NewIdentityFromSeed

func NewIdentityFromSeed(seed []byte) (*Identity, error)

func (*Identity) BtcecPublicKey

func (i *Identity) BtcecPublicKey() *btcec.PublicKey

func (*Identity) EncodedPublicKey

func (i *Identity) EncodedPublicKey() string

EncodedPublicKey returns the base58 'n...' form used in XRPL handshakes.

func (*Identity) PrivateKeyHex

func (i *Identity) PrivateKeyHex() string

PrivateKeyHex returns the private key as hex with the "00" prefix.

func (*Identity) PublicKey

func (i *Identity) PublicKey() []byte

PublicKey returns the raw compressed public key bytes.

func (*Identity) PublicKeyHex

func (i *Identity) PublicKeyHex() string

func (*Identity) Save

func (i *Identity) Save(dataDir string) error

func (*Identity) Sign

func (i *Identity) Sign(message []byte) ([]byte, error)

Sign hashes message with sha512Half then signs (DER ECDSA).

func (*Identity) SignDigest

func (i *Identity) SignDigest(digest []byte) ([]byte, error)

SignDigest signs a pre-hashed 32-byte digest (used for session sigs).

func (*Identity) TLSCertificate

func (i *Identity) TLSCertificate() tls.Certificate

TLSCertificate is the crypto/tls form of TLSCertificatePEM, used by stdlib call sites (RPC HTTPS, WebSocket).

func (*Identity) TLSCertificatePEM

func (i *Identity) TLSCertificatePEM() (certPEM, keyPEM []byte, err error)

TLSCertificatePEM returns a self-signed RSA-2048 TLS keypair (cached per Identity). The cert is unrelated to the secp256k1 node identity; peer trust comes from the Public-Key handshake header.

type IgnoredSquelchCallback

type IgnoredSquelchCallback func(peerID PeerID)

IgnoredSquelchCallback is called when a peer keeps relaying a validator's messages after being placed in the Squelched state — i.e., the peer is ignoring a TMSquelch we previously sent it.

Mirrors rippled's ignored_squelch_callback at Slot.h:112-113, invoked inside Slot::update at Slot.h:329-331. The overlay uses this signal to charge the offending peer's reputation (bad-data balance) so sustained squelch-violations eventually evict the peer rather than leaving squelch enforcement purely advisory.

Invoked from the hot receive path, OUTSIDE any slot mutex — implementations must be non-blocking. The callback carries only the peerID because the charge is per-peer: a peer can ignore the squelch for multiple validators, and each event should land on the same peer's balance.

type InboundMessage

type InboundMessage struct {
	// PeerID identifies the sender.
	PeerID PeerID

	// Type is the message type.
	Type uint16

	// Payload is the raw message payload.
	Payload []byte
}

InboundMessage represents a message received from a peer. This is exposed to consumers of the Overlay.

type LedgerDataType

type LedgerDataType int

LedgerDataType represents the type of ledger data being requested.

const (
	// LedgerDataTypeUnknown is an unknown data type.
	LedgerDataTypeUnknown LedgerDataType = iota
	// LedgerDataTypeHeader is the ledger header.
	LedgerDataTypeHeader
	// LedgerDataTypeAccountState is account state data.
	LedgerDataTypeAccountState
	// LedgerDataTypeTransactionNode is transaction tree nodes.
	LedgerDataTypeTransactionNode
	// LedgerDataTypeTransactionSetCandidate is transaction set candidates.
	LedgerDataTypeTransactionSetCandidate
)

type LedgerHints

type LedgerHints struct {
	Closed [32]byte
	Parent [32]byte
}

type LedgerProvider

type LedgerProvider interface {
	// GetLedgerHeader returns the header for a ledger.
	GetLedgerHeader(hash []byte, seq uint32) ([]byte, error)
	// GetAccountStateNode returns an account state node.
	GetAccountStateNode(ledgerHash []byte, nodeID []byte) ([]byte, error)
	// GetTransactionNode returns a transaction tree node.
	GetTransactionNode(ledgerHash []byte, nodeID []byte) ([]byte, error)
	// GetReplayDelta returns the serialized ledger header and every
	// transaction leaf blob (in tx-map order) for the given ledger hash.
	// Implementations must only return data for closed/immutable ledgers
	// (mirrors rippled's ledger->isImmutable() check in
	// LedgerReplayMsgHandler::processReplayDeltaRequest). When the ledger
	// is unknown or not yet immutable, return (nil, nil, nil) so the
	// handler can reply with reNO_LEDGER.
	GetReplayDelta(ledgerHash []byte) (header []byte, txLeaves [][]byte, err error)
	// GetProofPath returns the serialized ledger header and the wire-order
	// node path proving the existence of `key` in the requested map of
	// the given ledger. mapType selects the source map:
	//   - LedgerMapTransaction (1)  → tx map
	//   - LedgerMapAccountState (2) → account-state map
	//
	// Wire path orientation is leaf-to-root, matching both
	// shamap.GetProofPath and rippled's SHAMap::getProofPath
	// (rippled/src/xrpld/shamap/detail/SHAMapSync.cpp:800-833) — that
	// implementation pops a stack whose top is the leaf, yielding
	// leaf-first blobs which are then verified by reverse iteration in
	// SHAMap::verifyProofPath (same file, line 847).
	//
	// Return contract:
	//   - (nil, nil, ErrLedgerNotFound) when the ledger is unknown or not
	//     yet immutable. The handler emits ReplyErrorNoLedger.
	//   - (nil, nil, ErrKeyNotFound) when the ledger exists but the key
	//     has no leaf in the selected map. The handler emits
	//     ReplyErrorNoNode. Rippled returns reNO_NODE without a header
	//     in this case (LedgerReplayMsgHandler.cpp:84-90, where header
	//     packing happens AFTER the no-path early-return), so we mirror
	//     that and do not require the header here.
	//   - (header, path, nil) on success.
	//   - any other error → handler emits ReplyErrorBadRequest and logs
	//     at warn.
	GetProofPath(ledgerHash []byte, key []byte, mapType message.LedgerMapType) (header []byte, path [][]byte, err error)
}

LedgerProvider is called to retrieve ledger data for responses.

type LedgerRequest

type LedgerRequest struct {
	LedgerHash   []byte
	LedgerSeq    uint32
	DataType     LedgerDataType
	NodeIDs      [][]byte // Specific nodes to request
	State        RequestState
	CreatedAt    time.Time
	SentAt       time.Time
	CompletedAt  time.Time
	Peer         PeerID
	ResponseData [][]byte
	Error        error
}

LedgerRequest represents a request for ledger data.

type LedgerSyncHandler

type LedgerSyncHandler struct {
	// contains filtered or unexported fields
}

LedgerSyncHandler handles ledger synchronization messages.

func NewLedgerSyncHandler

func NewLedgerSyncHandler(events chan<- Event) *LedgerSyncHandler

NewLedgerSyncHandler creates a new ledger sync handler.

func (*LedgerSyncHandler) CleanupExpiredRequests

func (h *LedgerSyncHandler) CleanupExpiredRequests()

CleanupExpiredRequests removes timed-out requests.

func (*LedgerSyncHandler) CreateRequest

func (h *LedgerSyncHandler) CreateRequest(ledgerHash []byte, ledgerSeq uint32, dataType LedgerDataType) *LedgerRequest

CreateRequest creates a new ledger data request.

func (*LedgerSyncHandler) DroppedResponses

func (h *LedgerSyncHandler) DroppedResponses() uint64

DroppedResponses returns the cumulative count of ledger-sync responses dropped due to back-pressure on the events channel. Surfaced by the overlay's DroppedLedgerResponses.

func (*LedgerSyncHandler) GetPendingRequests

func (h *LedgerSyncHandler) GetPendingRequests(peerID PeerID) []*LedgerRequest

GetPendingRequests returns all pending requests for a peer.

func (*LedgerSyncHandler) HandleMessage

func (h *LedgerSyncHandler) HandleMessage(ctx context.Context, peerID PeerID, msg message.Message) error

HandleMessage handles a ledger sync message.

mtREPLAY_DELTA_RESPONSE intentionally has no arm here: orchestration of an outbound replay-delta acquisition (state machine, hash verification, adoption) lives in the consensus router, which receives the response via the overlay's Messages() channel. Delivering the response twice — once to the router, once to this handler — would create competing consumers and a race on the inbound-acquisition state.

func (*LedgerSyncHandler) PendingRequestCount

func (h *LedgerSyncHandler) PendingRequestCount() int

PendingRequestCount returns the number of pending requests.

func (*LedgerSyncHandler) PreferredPeersForLedger

func (h *LedgerSyncHandler) PreferredPeersForLedger(target [32]byte) []PeerID

PreferredPeersForLedger returns peer IDs whose last-known Closed-Ledger hash matches target. Empty when no lookup is wired or no peer matches. Filters by a single advertised hash only — does not replicate rippled's PeerImp::hasLedger(hash, seq) range/recent-set logic used by InboundLedger catchup.

func (*LedgerSyncHandler) SetLedgerDataCallback

func (h *LedgerSyncHandler) SetLedgerDataCallback(fn func(ctx context.Context, peerID PeerID, data *message.LedgerData))

SetLedgerDataCallback sets the callback for incoming ledger data.

func (*LedgerSyncHandler) SetPeerLedgerHintLookup

func (h *LedgerSyncHandler) SetPeerLedgerHintLookup(fn func(target [32]byte) []PeerID)

func (*LedgerSyncHandler) SetProvider

func (h *LedgerSyncHandler) SetProvider(provider LedgerProvider)

SetProvider sets the ledger data provider for responding to requests.

type Message

type Message = message.Message

Message is the interface implemented by all protocol messages.

func DecodeMessage

func DecodeMessage(msgType MessageType, data []byte) (Message, error)

DecodeMessage decodes a message from bytes using protobuf.

type MessageHeader

type MessageHeader = message.Header

Header represents a parsed message header.

func DecodeMessageHeader

func DecodeMessageHeader(buf []byte) (*MessageHeader, error)

DecodeMessageHeader decodes a message header from the provided buffer.

func ReadMessage

func ReadMessage(r io.Reader) (*MessageHeader, []byte, error)

ReadMessage reads a complete message from the reader.

type MessageType

type MessageType = message.MessageType

MessageType represents the type of a peer protocol message.

type Option

type Option func(*Config)

Option is a functional option for configuring the overlay.

func WithBootstrapPeers

func WithBootstrapPeers(peers ...string) Option

WithBootstrapPeers sets the initial peers to connect to.

func WithClock

func WithClock(clock func() time.Time) Option

WithClock sets the clock function (for testing).

func WithClusterNodes

func WithClusterNodes(entries ...string) Option

WithClusterNodes sets the [cluster_nodes] entries (base58 node pubkey + optional trailing comment used as the human-readable name). Each entry is parsed by cluster.Registry.Load at Overlay construction; a malformed value fails startup. Mirrors rippled's behavior in Application init where Cluster::load failure aborts the node.

func WithCompression

func WithCompression(enabled bool) Option

WithCompression enables or disables message compression.

func WithConnectTimeout

func WithConnectTimeout(d time.Duration) Option

WithConnectTimeout sets the connection timeout.

func WithDataDir

func WithDataDir(path string) Option

WithDataDir sets the data directory for persistent storage.

func WithEventBufferSize

func WithEventBufferSize(size int) Option

WithEventBufferSize sets the internal event channel buffer size.

func WithFixedPeers

func WithFixedPeers(peers ...string) Option

WithFixedPeers sets peers that should always be connected.

func WithHandshakeTimeout

func WithHandshakeTimeout(d time.Duration) Option

WithHandshakeTimeout sets the handshake timeout.

func WithIdleTimeout

func WithIdleTimeout(d time.Duration) Option

WithIdleTimeout sets the idle timeout before disconnecting a peer.

func WithLedgerReplay

func WithLedgerReplay(enabled bool) Option

WithLedgerReplay enables or disables the ledgerreplay X-Protocol-Ctl feature. When disabled we won't advertise replay support, so peers won't offer us mtREPLAY_DELTA_RESPONSE and won't accept replay requests from us — the catchup path falls back to legacy GetLedger.

func WithListenAddr

func WithListenAddr(addr string) Option

WithListenAddr sets the listen address for incoming connections.

func WithLocalValidatorPubKey

func WithLocalValidatorPubKey(key []byte) Option

WithLocalValidatorPubKey sets the compressed secp256k1 public key (33 bytes) of the local validator identity, so inbound TMSquelch frames targeting our own validator can be dropped. Observer nodes should omit this option (the filter becomes a no-op). Matches rippled PeerImp.cpp:2715-2721 which ignores TMSquelch addressed to app_.getValidationPublicKey().

func WithMaxInbound

func WithMaxInbound(n int) Option

WithMaxInbound sets the maximum number of inbound connections.

func WithMaxOutbound

func WithMaxOutbound(n int) Option

WithMaxOutbound sets the maximum number of outbound connections.

func WithMaxPeers

func WithMaxPeers(n int) Option

WithMaxPeers sets the maximum total number of peers.

func WithMessageBufferSize

func WithMessageBufferSize(size int) Option

WithMessageBufferSize sets the inbound message channel buffer size.

func WithNetworkID

func WithNetworkID(id uint32) Option

WithNetworkID sets the network ID for peer validation.

func WithPingInterval

func WithPingInterval(d time.Duration) Option

WithPingInterval sets the ping interval for keepalive.

func WithPrivateMode

func WithPrivateMode(enabled bool) Option

WithPrivateMode enables private mode (don't share our address).

func WithPublicIP

func WithPublicIP(ip net.IP) Option

WithPublicIP sets the node's observed public address. Used to emit the `Local-IP` handshake header and to validate the peer's `Remote-IP` self-report. A nil or unspecified IP suppresses both.

func WithReduceRelay

func WithReduceRelay(enabled bool) Option

WithReduceRelay enables or disables reduce-relay optimization. Reduce-relay is opt-in and defaults to false to match rippled (Config.h:248, Config.cpp:755-762). Setting this to true activates both vprr and txrr via the Validate() cascade; callers who need one without the other should set EnableVPReduceRelay or EnableTxReduceRelay directly on the Config instead.

func WithServerDomain

func WithServerDomain(domain string) Option

WithServerDomain sets the operator domain emitted in the `Server-Domain` handshake header. An empty value suppresses the header (matching rippled's behavior when no domain is configured).

type Overlay

type Overlay struct {
	// contains filtered or unexported fields
}

Overlay is the central orchestrator for XRPL peer-to-peer networking. It manages peer connections, discovery, message routing, and the reduce-relay system.

func New

func New(opts ...Option) (*Overlay, error)

New creates a new Overlay with the provided options.

func (*Overlay) Broadcast

func (o *Overlay) Broadcast(msg []byte) error

Broadcast sends a message to all connected peers, unfiltered. Used for SELF-originated validator traffic (our own proposals and validations) and for non-validator messages (statusChange, etc.). Rippled deliberately skips the squelch filter for self-originated broadcasts in OverlayImpl.cpp:1133-1137; otherwise a peer that squelches our own pubkey would silence us to them.

For peer-originated validator messages that need to be gossip- forwarded, use RelayFromValidator which applies the squelch filter and excludes the originating peer.

func (*Overlay) BroadcastExcept

func (o *Overlay) BroadcastExcept(exceptPeer PeerID, msg []byte) error

BroadcastExcept sends a message to every connected peer except the one identified by exceptPeer. Used for gossip of peer-originated messages that are NOT per-validator (manifests) — the per-validator squelch filter in RelayFromValidator doesn't apply. Pass 0 for exceptPeer to fall through to a plain Broadcast. Mirrors rippled's OverlayImpl::foreach used to relay TMManifests at OverlayImpl.cpp:633-686.

func (*Overlay) Cluster

func (o *Overlay) Cluster() *cluster.Registry

Cluster returns the registry of cluster-trusted node identities loaded from [cluster_nodes]. Always non-nil post-construction.

func (*Overlay) ClusterJSON

func (o *Overlay) ClusterJSON() map[string]any

ClusterJSON returns the top-level cluster object for the `peers` RPC response, mirroring rippled doPeers (Peers.cpp:59-80).

func (*Overlay) Connect

func (o *Overlay) Connect(addr string) error

Connect initiates an outbound connection to the specified address.

func (*Overlay) DroppedLedgerResponses

func (o *Overlay) DroppedLedgerResponses() uint64

DroppedLedgerResponses returns the cumulative count of ledger-sync responses dropped due to a full events channel (see LedgerSyncHandler.sendReplayDeltaResponse / sendProofPathResponse). Same shape as DroppedMessages but for the server-side response path. Delegates to the handler's own counter so the two drop sites (handler-side events-channel drop and any future overlay-side drop tracked in droppedLedgerResponses) can both contribute.

func (*Overlay) DroppedMessages

func (o *Overlay) DroppedMessages() uint64

DroppedMessages returns the cumulative count of inbound messages the overlay had to drop because the downstream consumer channel was full. Surfaced via server_info/server_state for operators to detect consumer back-pressure — a nonzero and growing value indicates the router/engine can't keep up with network ingress.

func (*Overlay) Identity

func (o *Overlay) Identity() *Identity

Identity returns the node's identity.

func (*Overlay) IncPeerBadData

func (o *Overlay) IncPeerBadData(peerID PeerID, reason string) uint32

IncPeerBadData records an invalid-data event attributed to the peer with the given PeerID. Returns the new cumulative count, or 0 when the peer is unknown (gracefully no-ops). Exposed so higher layers that can't import *Peer directly — e.g., the consensus router, which only sees PeerID via InboundMessage — can still charge a peer for malformed/invalid payloads. `reason` is a short stable label for diagnostic logging; it's forwarded to Peer.IncBadData.

Use this as the single surface for higher-layer charge-backs: the peermanagement package already increments inline for events it detects itself (e.g., AddSquelch) so callers outside this package only need to cover the cases they detect themselves.

func (*Overlay) IsValidatorSquelchedOnPeer

func (o *Overlay) IsValidatorSquelchedOnPeer(peerID PeerID, validator []byte) bool

IsValidatorSquelchedOnPeer reports whether the local peer with the given PeerID currently has an active squelch for `validator`. It is the programmatic counterpart of peer.ExpireSquelch, which returns true when there is NO active squelch — this wrapper inverts so the name matches the usual intuition (true = this peer has been told to squelch this validator). Useful for end-to-end tests that verify TMSquelch was parsed and recorded by the receiver.

func (*Overlay) IssueSquelch

func (o *Overlay) IssueSquelch(validator []byte, peerID PeerID, squelch bool, duration time.Duration)

IssueSquelch hand-rolls a TMSquelch frame to the given peer, marking the given validator's messages as to-be-squelched (or cleared when squelch=false). This is the same path the reduce-relay system takes when it autonomously squelches a peer — mirroring rippled's OverlayImpl::squelch — but is exposed as a deliberate API so callers (including integration tests) can drive squelch state changes without having to reach a natural squelch threshold.

func (*Overlay) LedgerSync

func (o *Overlay) LedgerSync() *LedgerSyncHandler

LedgerSync returns the overlay's ledger-sync handler so callers in a higher layer (e.g., consensus startup) can wire a LedgerProvider that imports internal/ledger packages — which this layer cannot.

func (*Overlay) ListenAddr

func (o *Overlay) ListenAddr() string

ListenAddr returns the resolved address the overlay is accepting connections on, or the empty string if no listener is bound. Useful when the overlay was configured with port 0 (ephemeral) and the caller needs the actual port to drive a peer connection — e.g., integration tests that wire two overlays together on localhost.

func (*Overlay) Messages

func (o *Overlay) Messages() <-chan *InboundMessage

Messages returns a channel for receiving inbound messages.

func (*Overlay) OnValidatorMessage

func (o *Overlay) OnValidatorMessage(validatorKey []byte, peerID PeerID)

OnValidatorMessage is called by the consensus router on every inbound trusted proposal/validation so the reduce-relay state machine can select peers to squelch. Mirrors rippled's PeerImp::updateSlotAndSquelch (PeerImp.cpp:1737,2385,3013,3049).

Without this wiring the Relay.OnMessage loop never sees inbound activity and mtSQUELCH is never emitted — which was the pre-fix behavior the PR review caught.

func (*Overlay) PeerCount

func (o *Overlay) PeerCount() int

PeerCount returns the number of connected peers.

func (*Overlay) PeerSupports

func (o *Overlay) PeerSupports(peerID PeerID, f Feature) bool

PeerSupports reports whether the peer identified by peerID has advertised support for the given protocol feature via its handshake headers. Returns false when the peer is unknown, the handshake has not completed, or the feature was not negotiated. Used by higher layers (e.g., consensus catchup) to avoid issuing feature-gated requests to peers that would silently drop them.

func (*Overlay) Peers

func (o *Overlay) Peers() []PeerInfo

Peers returns information about all connected peers.

func (*Overlay) PeersJSON

func (o *Overlay) PeersJSON() []map[string]any

PeersJSON implements types.PeerSource for the `peers` RPC method, emitting the subset of rippled PeerImp::json (PeerImp.cpp:388-503) fields for which goXRPL has data.

func (*Overlay) PeersThatHave

func (o *Overlay) PeersThatHave(suppressionHash [32]byte) []PeerID

PeersThatHave returns the set of peer IDs known to have the message whose suppression-hash is `suppressionHash`. Entries are populated when we relay a validator message outward (RelayFromValidator) and expire after RelayedIndexTTL.

Returns nil when the hash is unknown or the bucket has aged out — callers treat both equivalently (nothing to feed the slot with beyond the current originPeer).

Thread-safe. The returned slice is a private copy the caller may mutate freely.

func (*Overlay) PeersWithClosedLedger

func (o *Overlay) PeersWithClosedLedger(target [32]byte) []PeerID

PeersWithClosedLedger returns peers whose last-known Closed-Ledger hash equals target. The hash is seeded from the handshake hint and refreshed by inbound mtSTATUS_CHANGE messages, mirroring the PeerImp::closedLedgerHash_ field in rippled. This is a primitive for callers that want a coarse "who advertised this LCL" filter; it is NOT an analogue of rippled's catchup peer selection, which goes through PeerImp::hasLedger(hash, seq) over [minLedger_, maxLedger_] and the recentLedgers_ ring — state goXRPL does not yet track per peer.

func (*Overlay) PingTimeoutDisconnects

func (o *Overlay) PingTimeoutDisconnects() uint64

PingTimeoutDisconnects returns the cumulative count of peers torn down because they failed to answer pings within pingTimeout. A nonzero, growing value flags either a flaky network or peers that have stopped servicing the overlay protocol.

func (*Overlay) RelayFromValidator

func (o *Overlay) RelayFromValidator(validator []byte, suppressionHash [32]byte, exceptPeer PeerID, msg []byte) error

RelayFromValidator forwards a peer-originated validator message (proposal or validation) to other connected peers, applying the per-peer squelch filter on the ORIGINATING validator's pubkey AND excluding the originating peer (exceptPeer). Pass 0 for exceptPeer when no peer should be excluded (e.g. tests that synthesize a relay).

suppressionHash is the consensus-router suppression key for this message (same [32]byte used by the dedup cache). Every peer we actually send to is recorded in the reverse index so a later duplicate arrival from ANOTHER peer can query Overlay.PeersThatHave(suppressionHash) and feed the reduce-relay slot with the full set of known-havers — matching rippled's haveMessage return from overlay_.relay at PeerImp.cpp:3010-3017 / 3044-3054.

Mirrors rippled's gossip-forward path in OverlayImpl::relay: the squelch is consulted before each outbound send (PeerImp.cpp:240-256) and expired squelches auto-clear via Peer.ExpireSquelch. Self-origin is handled by a separate code path (see Broadcast) that skips the filter entirely.

func (*Overlay) Run

func (o *Overlay) Run(ctx context.Context) error

Run starts the overlay and blocks until the context is cancelled.

func (*Overlay) Send

func (o *Overlay) Send(peerID PeerID, msg []byte) error

Send sends a message to a specific peer.

func (*Overlay) SetLedgerHintProvider

func (o *Overlay) SetLedgerHintProvider(fn func() (LedgerHints, bool))

SetLedgerHintProvider wires the hint source; nil suppresses headers.

func (*Overlay) SetPeerConnectCallback

func (o *Overlay) SetPeerConnectCallback(cb func(PeerID))

SetPeerConnectCallback registers a callback fired after a peer's handshake has completed and the peer is in the overlay's peer map. Same blocking contract as SetPeerDisconnectCallback: runs on the event loop and MUST NOT block. Passing nil clears the callback.

Used by the consensus router to send our local validator manifest to a freshly-connected peer (#372), so peers configured under validator-list publishing can resolve our signing key back to the trusted master.

func (*Overlay) SetPeerDisconnectCallback

func (o *Overlay) SetPeerDisconnectCallback(cb func(PeerID))

SetPeerDisconnectCallback registers a callback fired after a peer is removed from the overlay. The callback runs on the event-loop goroutine so implementations MUST NOT block — push to a channel if meaningful work is needed. Passing nil clears the callback.

This is the channel by which higher layers (e.g. the consensus router) are notified of disconnects so they can clean their own per-peer state. Prefer this over polling Peers().

func (*Overlay) SetPeerStatusPublisher

func (o *Overlay) SetPeerStatusPublisher(fn func(PeerStatusUpdate))

SetPeerStatusPublisher wires a sink for pubPeerStatus events. Mirrors rippled's NetworkOPs::pubPeerStatus (NetworkOPs.cpp:2514) — the overlay invokes this callback for every non-lostSync TMStatusChange after state has been recorded, mirroring PeerImp.cpp:1892-1963. Passing nil disconnects the sink.

func (*Overlay) SetValidLedgerProvider

func (o *Overlay) SetValidLedgerProvider(fn func() (seq uint32, age time.Duration, ok bool))

SetValidLedgerProvider wires the validated-ledger source used by handleStatusChange (PeerImp.cpp:1885-1890). ok=false suppresses tracking updates.

func (*Overlay) Stop

func (o *Overlay) Stop() error

Stop gracefully shuts down the overlay.

type Peer

type Peer struct {
	// contains filtered or unexported fields
}

Peer represents a connection to an XRPL peer node.

func NewPeer

func NewPeer(id PeerID, endpoint Endpoint, inbound bool, identity *Identity, events chan<- Event) *Peer

func (*Peer) AcceptConnection

func (p *Peer) AcceptConnection(conn net.Conn) error

AcceptConnection assigns conn to an inbound peer. Returns ErrAlreadyConnected if a Connect or earlier Accept is in flight.

func (*Peer) AddSquelch

func (p *Peer) AddSquelch(validator []byte, duration time.Duration) bool

AddSquelch records a squelch from this peer. Returns false (and removes any prior entry) on out-of-range duration or when the cap is hit by a NEW validator key. Both rejections charge bad-data fee.

func (*Peer) BadDataCount

func (p *Peer) BadDataCount() uint32

BadDataCount returns the running balance, clamped to non-negative.

func (*Peer) Capabilities

func (p *Peer) Capabilities() *PeerCapabilities

func (*Peer) CheckTracking

func (p *Peer) CheckTracking(peerSeq, validSeq uint32)

CheckTracking mirrors rippled PeerImp::checkTracking (PeerImp.cpp:1986-2005). CAS on the diverged branch keeps a concurrent Converged write from being clobbered (rippled holds recentLock_; CAS is the lock-free equivalent).

func (*Peer) Close

func (p *Peer) Close() error

func (*Peer) ClosedLedger

func (p *Peer) ClosedLedger() ([32]byte, bool)

ClosedLedger reports the peer's last closed-ledger hint, or ok=false.

func (*Peer) Connect

func (p *Peer) Connect(ctx context.Context, cfg PeerConfig) error

func (*Peer) DecayBadData

func (p *Peer) DecayBadData()

DecayBadData halves the balance. Called periodically by the overlay so transient errors don't accumulate to eviction.

func (*Peer) Endpoint

func (p *Peer) Endpoint() Endpoint

func (*Peer) ExpireSquelch

func (p *Peer) ExpireSquelch(validator []byte) bool

ExpireSquelch reports whether a message from validator may be relayed to this peer. Clears the entry if an existing squelch has expired.

func (*Peer) ID

func (p *Peer) ID() PeerID

func (*Peer) Inbound

func (p *Peer) Inbound() bool

Inbound returns true if this is an inbound connection.

func (*Peer) IncBadData

func (p *Peer) IncBadData(reason string) uint32

IncBadData adds BadDataWeight(reason) to the running balance and returns the new total (clamped to non-negative).

func (*Peer) Info

func (p *Peer) Info() PeerInfo

func (*Peer) LastStatus

func (p *Peer) LastStatus() message.NodeStatus

LastStatus returns the peer's most recently advertised NodeStatus (rippled's last_status_.newstatus()). Returns 0 (nsUNKNOWN) when the peer has never sent a TMStatusChange with new_status, OR when the most recent TMStatusChange omitted new_status — both cases drop the stored value, mirroring rippled's `last_status_ = *m;` overwrite at PeerImp.cpp:1802 / 1807. This is what the `peers` RPC's `if (last_status.has_newstatus())` gate (PeerImp.cpp:463) reads; the per-event pubPeerStatus inheritance is a separate path that flows through applyStatusChange's return value.

func (*Peer) Latency

func (p *Peer) Latency() (time.Duration, bool)

func (*Peer) LedgerRange

func (p *Peer) LedgerRange() (uint32, uint32)

LedgerRange returns the peer's advertised (min, max) ledger sequence, or (0, 0) when no range has been advertised.

func (*Peer) Load

func (p *Peer) Load() int64

func (*Peer) NetworkID

func (p *Peer) NetworkID() string

NetworkID reports the peer's reported Network-ID handshake header, or "" if the peer omitted it. Mirrors rippled PeerImp's headers_["Network-ID"] passthrough (PeerImp.cpp:411-412).

func (*Peer) OnPong

func (p *Peer) OnPong(seq uint32, receivedAt time.Time)

OnPong correlates a Pong with the matching Ping by seq and updates the EWMA-smoothed latency. Mirrors PeerImp::onMessage(TMPing) (PeerImp.cpp:1099-1118): rtt is rounded to ms, then smoothed at ms granularity via latency = (latency*7 + rtt) / 8.

A matching pong also evicts every pending entry sent at-or-before the matched send-time. Rippled keeps a single in-flight ping (PeerImp.h:115 `std::optional<uint32_t> lastPingSeq_`), so its 60s ping-timeout fires only when the most-recent cycle goes unanswered. goXRPL ticks at 15s and may queue several pings while a pong is in transit; without this sweep, a peer that drops one ping per 60s window but otherwise responds promptly would be evicted by staleInFlightPing's oldest-wins check, while rippled would keep it. Clearing older entries makes the disconnect criterion "no pong for ≥pingTimeout" instead of "any single ping ≥pingTimeout old", matching rippled's single-cycle semantics.

func (*Peer) PreviousLedger

func (p *Peer) PreviousLedger() ([32]byte, bool)

PreviousLedger reports the peer's previous-ledger hint, or ok=false.

func (*Peer) ProtocolVersion

func (p *Peer) ProtocolVersion() string

ProtocolVersion returns the negotiated peer-protocol token (e.g. "XRPL/2.2") captured during the handshake, or "" if unknown.

func (*Peer) RemoteIP

func (p *Peer) RemoteIP() string

RemoteIP is the IP from the actual TCP connection (not the self-reported header).

func (*Peer) RemotePublicKey

func (p *Peer) RemotePublicKey() *PublicKeyToken

func (*Peer) RemoveSquelch

func (p *Peer) RemoveSquelch(validator []byte)

func (*Peer) Run

func (p *Peer) Run(ctx context.Context) error

Run starts read/write/ping loops; returns when any of them errors.

func (*Peer) Send

func (p *Peer) Send(data []byte) error

func (*Peer) ServerDomain

func (p *Peer) ServerDomain() string

func (*Peer) State

func (p *Peer) State() PeerState

State returns the current connection state.

func (*Peer) Tracking

func (p *Peer) Tracking() PeerTracking

type PeerCapabilities

type PeerCapabilities struct {
	Features *FeatureSet
	// contains filtered or unexported fields
}

PeerCapabilities holds only fields that the handshake actually populates — no protocol metadata stored as zero values.

func NewPeerCapabilities

func NewPeerCapabilities() *PeerCapabilities

func (*PeerCapabilities) HasFeature

func (pc *PeerCapabilities) HasFeature(f Feature) bool

func (*PeerCapabilities) SupportsCompression

func (pc *PeerCapabilities) SupportsCompression() bool

func (*PeerCapabilities) SupportsReduceRelay

func (pc *PeerCapabilities) SupportsReduceRelay() bool

type PeerConfig

type PeerConfig struct {
	SendBufferSize int
	PeerTLSConfig  *peertls.Config
}

func DefaultPeerConfig

func DefaultPeerConfig() PeerConfig

DefaultPeerConfig returns defaults; callers must set PeerTLSConfig before Connect.

type PeerError

type PeerError struct {
	PeerID   PeerID
	Endpoint Endpoint
	Op       string
	Err      error
}

PeerError wraps an error with peer context.

func NewEndpointError

func NewEndpointError(endpoint Endpoint, op string, err error) *PeerError

NewEndpointError creates a new PeerError with endpoint context.

func NewPeerError

func NewPeerError(peerID PeerID, op string, err error) *PeerError

NewPeerError creates a new PeerError.

func (*PeerError) Error

func (e *PeerError) Error() string

Error returns the error message.

func (*PeerError) Unwrap

func (e *PeerError) Unwrap() error

Unwrap returns the underlying error.

type PeerID

type PeerID uint64

PeerID is a unique identifier for a connected peer.

type PeerInfo

type PeerInfo struct {
	ID             PeerID
	Endpoint       Endpoint
	Inbound        bool
	State          PeerState
	PublicKey      string
	PublicKeyBytes []byte
	ConnectedAt    time.Time
	MessagesIn     uint64
	MessagesOut    uint64

	ServerDomain    string
	NetworkID       string
	Version         string
	ClosedLedger    string
	CompleteLedgers string
	Tracking        PeerTracking
	Load            int64

	Latency    time.Duration
	HasLatency bool

	Protocol string

	Status message.NodeStatus

	// Per-peer wire byte counters and rolling-window throughput.
	// Mirrors rippled PeerImp::metrics_ (PeerImp.h:226-230). Emitted
	// under the `metrics` object in `peers` RPC.
	TotalBytesRecv uint64
	TotalBytesSent uint64
	AvgBpsRecv     uint64
	AvgBpsSent     uint64
}

PeerInfo is a read-only snapshot of peer state.

type PeerReservation

type PeerReservation struct {
	NodeID      string `json:"node_id"`
	Description string `json:"description,omitempty"`
}

PeerReservation represents a reserved peer slot.

type PeerScore

type PeerScore struct {

	// Positive factors
	MessagesRelayed uint64
	ValidMessages   uint64
	Uptime          uint64

	// Negative factors
	InvalidMessages uint64
	Timeouts        uint64
	Disconnects     uint64
	// contains filtered or unexported fields
}

PeerScore tracks peer quality for connection decisions.

func NewPeerScore

func NewPeerScore() *PeerScore

NewPeerScore creates a new PeerScore.

func (*PeerScore) RecordDisconnect

func (ps *PeerScore) RecordDisconnect()

RecordDisconnect records a disconnect.

func (*PeerScore) RecordInvalidMessage

func (ps *PeerScore) RecordInvalidMessage()

RecordInvalidMessage records an invalid message received.

func (*PeerScore) RecordTimeout

func (ps *PeerScore) RecordTimeout()

RecordTimeout records a timeout.

func (*PeerScore) RecordValidMessage

func (ps *PeerScore) RecordValidMessage()

RecordValidMessage records a valid message received.

func (*PeerScore) Score

func (ps *PeerScore) Score() int

Score calculates the peer's overall score.

type PeerState

type PeerState int

PeerState represents the peer connection state.

const (
	PeerStateDisconnected PeerState = iota
	PeerStateConnecting
	PeerStateConnected
	PeerStateClosing
)

func (PeerState) String

func (s PeerState) String() string

String returns the string representation of PeerState.

type PeerStatusUpdate

type PeerStatusUpdate struct {
	// Status mirrors PeerImp.cpp:1895-1915. UPPERCASE spelling.
	// Carries the post-inheritance value returned by applyStatusChange,
	// so a status-less wire message still emits the prior enum once
	// (rippled's `m->set_newstatus(status)` mutation at PeerImp.cpp:1808).
	Status string
	// Action mirrors PeerImp.cpp:1917-1934 — CLOSING_LEDGER,
	// ACCEPTED_LEDGER, SWITCHED_LEDGER. LOST_SYNC is unreachable
	// because handleStatusChange returns at PeerImp.cpp:1830 before
	// the publish.
	Action string
	// LedgerHash mirrors PeerImp.cpp:1941-1949: rippled re-reads the
	// peer's closedLedgerHash_ under recentLock_ rather than echoing
	// the raw wire bytes. When wire bytes were malformed, that stored
	// hash is zeroed at PeerImp.cpp:1850 and rippled emits the
	// 64-char zero hex string — so callers must ALWAYS emit a value
	// when has_ledgerhash, falling back to "00…00" if the peer's
	// post-apply state was cleared.
	LedgerHash string
	// LedgerIndex mirrors PeerImp.cpp:1936-1939 (`has_ledgerseq`).
	// nil = field absent; non-nil = emit (even when value is 0 — a
	// peer can legitimately advertise the genesis seq).
	LedgerIndex *uint32
	// Date mirrors PeerImp.cpp:1951-1954 (`has_networktime`). rippled
	// auto-stamps this at PeerImp.cpp:1796-1797 when the wire didn't
	// carry it, so handleStatusChange does the same and Date is
	// always non-nil here.
	Date *uint32
	// LedgerIndexMin / LedgerIndexMax mirror PeerImp.cpp:1956-1960
	// (`has_firstseq && has_lastseq`). Both are nil unless both wire
	// fields were present.
	LedgerIndexMin *uint32
	LedgerIndexMax *uint32
}

PeerStatusUpdate captures the post-decode TMStatusChange fields the RPC layer needs to materialize a peer_status WebSocket event. Pointer fields preserve protobuf has-presence; nil means "wire field absent, rippled's `if (m->has_xxx)` gate is false" and the RPC layer omits the JSON field. Mirrors PeerImp.cpp:1892-1963.

type PeerTracking

type PeerTracking int32

PeerTracking mirrors rippled PeerImp::Tracking (PeerImp.h:58).

const (
	PeerTrackingUnknown PeerTracking = iota
	PeerTrackingConverged
	PeerTrackingDiverged
)

type PublicKeyToken

type PublicKeyToken struct {
	// contains filtered or unexported fields
}

PublicKeyToken is a peer's secp256k1 node public key.

func NewPublicKeyToken

func NewPublicKeyToken(data []byte) (*PublicKeyToken, error)

func NewPublicKeyTokenFromBtcec

func NewPublicKeyTokenFromBtcec(key *btcec.PublicKey) *PublicKeyToken

func ParsePublicKeyToken

func ParsePublicKeyToken(encoded string) (*PublicKeyToken, error)

ParsePublicKeyToken decodes a base58 'n...' node public key.

func VerifyPeerHandshake

func VerifyPeerHandshake(headers http.Header, sharedValue []byte, localPubKey string, cfg HandshakeConfig) (*PublicKeyToken, error)

VerifyPeerHandshake runs the post-Server-Domain rippled verify chain: Network-ID → Network-Time → Public-Key → Session-Signature → self-connection. Callers must run ValidateServerDomain first.

func (*PublicKeyToken) BtcecKey

func (p *PublicKeyToken) BtcecKey() *btcec.PublicKey

func (*PublicKeyToken) Bytes

func (p *PublicKeyToken) Bytes() []byte

func (*PublicKeyToken) Encode

func (p *PublicKeyToken) Encode() string

func (*PublicKeyToken) Equal

func (p *PublicKeyToken) Equal(other *PublicKeyToken) bool

type RecentEndpoints

type RecentEndpoints struct {
	// contains filtered or unexported fields
}

RecentEndpoints tracks recently seen endpoints from a peer.

func NewRecentEndpoints

func NewRecentEndpoints() *RecentEndpoints

NewRecentEndpoints creates a new RecentEndpoints tracker.

func (*RecentEndpoints) Expire

func (r *RecentEndpoints) Expire()

Expire removes old entries from the cache.

func (*RecentEndpoints) Filter

func (r *RecentEndpoints) Filter(endpoint string, hops uint32) bool

Filter returns true if we should NOT send this endpoint to the peer.

func (*RecentEndpoints) Insert

func (r *RecentEndpoints) Insert(endpoint string, hops uint32)

Insert records an endpoint as recently seen.

type Relay

type Relay struct {
	// contains filtered or unexported fields
}

Relay manages reduce-relay for all validators.

func NewRelay

func NewRelay(cfg *Config, onSquelch SquelchCallback) *Relay

NewRelay creates a new Relay manager with no ignored-squelch callback. Equivalent to NewRelayWithIgnoredCallback(cfg, onSquelch, nil). Existing callers (and tests that don't exercise G4) keep using this.

func NewRelayWithIgnoredCallback

func NewRelayWithIgnoredCallback(
	cfg *Config,
	onSquelch SquelchCallback,
	onIgnoredSquelch IgnoredSquelchCallback,
) *Relay

NewRelayWithIgnoredCallback creates a Relay that additionally invokes onIgnoredSquelch(peerID) every time a peer in the Squelched state relays a validator's message. The callback is propagated to every ValidatorSlot the Relay creates. Pass nil to disable the charge (falls back to NewRelay semantics).

func (*Relay) GetSelectedPeers

func (r *Relay) GetSelectedPeers(validatorKey []byte) []PeerID

GetSelectedPeers returns selected peers for a validator.

func (*Relay) OnMessage

func (r *Relay) OnMessage(validatorKey []byte, peerID PeerID)

OnMessage handles an incoming validator message.

func (*Relay) RemovePeer

func (r *Relay) RemovePeer(peerID PeerID)

RemovePeer removes a peer from all validator slots.

type RelayPeerInfo

type RelayPeerInfo struct {
	State       RelayPeerState
	Count       int
	Expire      time.Time
	LastMessage time.Time
}

RelayPeerInfo holds information about a peer in the reduce-relay system.

type RelayPeerState

type RelayPeerState int

RelayPeerState represents the state of a peer in the reduce-relay system.

const (
	RelayPeerCounting RelayPeerState = iota
	RelayPeerSelected
	RelayPeerSquelched
)

func (RelayPeerState) String

func (s RelayPeerState) String() string

String returns the string representation of RelayPeerState.

type RelaySlotState

type RelaySlotState int

RelaySlotState represents the state of a validator slot.

const (
	RelaySlotCounting RelaySlotState = iota
	RelaySlotSelected
)

type RequestState

type RequestState int

RequestState tracks the state of a ledger data request.

const (
	// RequestStatePending means the request is waiting to be sent.
	RequestStatePending RequestState = iota
	// RequestStateSent means the request has been sent.
	RequestStateSent
	// RequestStateReceived means the response has been received.
	RequestStateReceived
	// RequestStateFailed means the request failed.
	RequestStateFailed
	// RequestStateTimeout means the request timed out.
	RequestStateTimeout
)

type ReservationTable

type ReservationTable struct {
	// contains filtered or unexported fields
}

ReservationTable manages peer reservations.

func NewReservationTable

func NewReservationTable(dataDir string) *ReservationTable

NewReservationTable creates a new reservation table.

func (*ReservationTable) Contains

func (t *ReservationTable) Contains(nodeID string) bool

Contains returns true if the node has a reservation.

func (*ReservationTable) Insert

func (t *ReservationTable) Insert(r *PeerReservation)

Insert adds a reservation.

type Slot

type Slot struct {
	// contains filtered or unexported fields
}

Slot represents a peer connection slot with its state and properties.

func NewInboundSlot

func NewInboundSlot(localEndpoint, remoteEndpoint net.Addr, fixed bool) *Slot

NewInboundSlot creates a new slot for an inbound connection.

func NewOutboundSlot

func NewOutboundSlot(remoteEndpoint net.Addr, fixed bool) *Slot

NewOutboundSlot creates a new slot for an outbound connection.

func (*Slot) Activate

func (s *Slot) Activate()

Activate transitions the slot to the active state.

func (*Slot) Fixed

func (s *Slot) Fixed() bool

Fixed returns true if this is a fixed connection.

func (*Slot) Inbound

func (s *Slot) Inbound() bool

Inbound returns true if this is an inbound connection.

func (*Slot) IsActive

func (s *Slot) IsActive() bool

IsActive returns true if the slot is in the active state.

func (*Slot) RemoteEndpoint

func (s *Slot) RemoteEndpoint() net.Addr

RemoteEndpoint returns the remote endpoint.

func (*Slot) SetState

func (s *Slot) SetState(state SlotState)

SetState updates the connection state.

func (*Slot) State

func (s *Slot) State() SlotState

State returns the current connection state.

type SlotState

type SlotState int

SlotState represents the connection state of a peer slot.

const (
	SlotStateAccept SlotState = iota
	SlotStateConnect
	SlotStateConnected
	SlotStateActive
	SlotStateClosing
)

func (SlotState) String

func (s SlotState) String() string

String returns the string representation of the state.

type SquelchCallback

type SquelchCallback func(validator []byte, peerID PeerID, squelch bool, duration time.Duration)

SquelchCallback is called when a peer should be squelched/unsquelched.

type TrafficCategory

type TrafficCategory int

TrafficCategory represents a traffic category for counting.

const (
	CategoryBase TrafficCategory = iota
	CategoryCluster
	CategoryOverlay
	CategoryManifests
	CategoryTransaction
	CategoryProposal
	CategoryValidation
	CategoryValidatorList
	CategorySquelch
	CategoryLedgerData
	CategoryTotal
	CategoryUnknown
)

func CategorizeMessage

func CategorizeMessage(msgType uint16) TrafficCategory

CategorizeMessage determines the traffic category for a message type.

func (TrafficCategory) String

func (c TrafficCategory) String() string

String returns the string representation of a category.

type TrafficCounter

type TrafficCounter struct {
	// contains filtered or unexported fields
}

TrafficCounter tracks ingress and egress traffic by category.

func NewTrafficCounter

func NewTrafficCounter() *TrafficCounter

NewTrafficCounter creates a new TrafficCounter.

func (*TrafficCounter) AddCount

func (tc *TrafficCounter) AddCount(cat TrafficCategory, inbound bool, bytes int)

AddCount records traffic for a category.

func (*TrafficCounter) GetStats

func (tc *TrafficCounter) GetStats(cat TrafficCategory) *TrafficStats

GetStats returns statistics for a category.

func (*TrafficCounter) GetTotalStats

func (tc *TrafficCounter) GetTotalStats() *TrafficStats

GetTotalStats returns the total traffic statistics.

func (*TrafficCounter) Reset

func (tc *TrafficCounter) Reset()

Reset resets all counters.

type TrafficStats

type TrafficStats struct {
	Name        string
	BytesIn     uint64
	BytesOut    uint64
	MessagesIn  uint64
	MessagesOut uint64
}

TrafficStats holds traffic statistics.

type ValidatorSlot

type ValidatorSlot struct {
	// contains filtered or unexported fields
}

ValidatorSlot manages peer selection for a specific validator.

func NewValidatorSlot

func NewValidatorSlot(maxSelected int, onSquelch SquelchCallback) *ValidatorSlot

NewValidatorSlot creates a new reduce-relay slot for a validator.

func (*ValidatorSlot) DeletePeer

func (s *ValidatorSlot) DeletePeer(validator []byte, peerID PeerID, erase bool)

DeletePeer handles peer disconnection.

func (*ValidatorSlot) GetSelected

func (s *ValidatorSlot) GetSelected() []PeerID

GetSelected returns the selected peer IDs.

func (*ValidatorSlot) Update

func (s *ValidatorSlot) Update(validator []byte, peerID PeerID)

Update processes a message from a peer.

Directories

Path Synopsis
Package cluster maintains the registry of cluster-trusted node identities — operators run a small set of nodes that they configure to know about each other via [cluster_nodes].
Package cluster maintains the registry of cluster-trusted node identities — operators run a small set of nodes that they configure to know about each other via [cluster_nodes].
Package message implements XRPL peer protocol message types and serialization.
Package message implements XRPL peer protocol message types and serialization.
Package peertls is the TLS 1.2 transport for XRPL peer connections.
Package peertls is the TLS 1.2 transport for XRPL peer connections.
shim
Package shim is the cgo binding for the OpenSSL TLS shim used by peertls.
Package shim is the cgo binding for the OpenSSL TLS shim used by peertls.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL