Documentation
¶
Overview ¶
Package p2p provides peer and transport abstractions for StreamHive.
TCP transport supports context-aware listen/dial, optional TLS, optional length-prefixed frames (SHV1), metrics, connection limits, and hooks for connect/disconnect.
Index ¶
Constants ¶
const HandshakeVersionV1 = "streamhive/1"
HandshakeVersionV1 is the initial handshake string carried in the first application frame.
Variables ¶
var ( // FrameMagic identifies StreamHive frames on the wire. FrameMagic = []byte("SHV1") // DefaultMaxFrameBytes caps a single frame payload (DoS bound). DefaultMaxFrameBytes = 4 << 20 )
Wire format: Magic (4) + length (4, big-endian) + payload.
var ( // ErrBadMagic means the stream does not start with FrameMagic. ErrBadMagic = errors.New("p2p: bad frame magic") // ErrFrameTooLarge means the declared length exceeds the configured maximum. ErrFrameTooLarge = errors.New("p2p: frame too large") )
var ErrAddrRequired = errors.New("p2p: listen address is required")
ErrAddrRequired is returned when ListenAddress is empty.
var ErrAlreadyListening = errors.New("p2p: already listening")
ErrAlreadyListening is returned when ListenAndAccept is called more than once.
var ErrTransportClosed = errors.New("p2p: transport closed")
ErrTransportClosed is returned when Dial is used after Close.
Functions ¶
Types ¶
type TCPPeer ¶
type TCPPeer struct {
// contains filtered or unexported fields
}
TCPPeer is a TCP-backed Peer.
func NewTCPPeer ¶
NewTCPPeer wraps a connection as a Peer.
func (*TCPPeer) IsOutbound ¶
IsOutbound reports whether this peer was created from a dial (outbound).
func (*TCPPeer) RemoteAddr ¶
RemoteAddr returns the remote network address.
type TCPTransport ¶
type TCPTransport struct {
ListenAddress string
Listener net.Listener
OnPeer func(Peer)
OnPeerDisconnected func(Peer)
// FrameHandler, if set, reads length-prefixed frames until error or handler error.
FrameHandler func(ctx context.Context, peer Peer, payload []byte) error
Logger *slog.Logger
// MaxPeers limits simultaneous peers (0 = unlimited).
MaxPeers int
// DialTimeout bounds each Dial when ctx has no earlier deadline (0 = only ctx).
DialTimeout time.Duration
// ReadIdleTimeout sets read deadlines on framed or discard read loops (0 = none).
ReadIdleTimeout time.Duration
// MaxFrameBytes caps ReadFrame payload size when using FrameHandler (0 = DefaultMaxFrameBytes).
MaxFrameBytes int
TLSServerConfig *tls.Config
TLSClientConfig *tls.Config
// contains filtered or unexported fields
}
TCPTransport listens on TCP and tracks connected peers.
func NewTCPTransport ¶
func NewTCPTransport(listenAddr string) *TCPTransport
NewTCPTransport constructs a transport; ListenAddress must be non-empty before ListenAndAccept.
func (*TCPTransport) Addr ¶
func (t *TCPTransport) Addr() net.Addr
Addr returns the bound listen address, or nil if not listening.
func (*TCPTransport) Close ¶
func (t *TCPTransport) Close() error
Close shuts down the listener, waits for the accept loop, and closes peers.
func (*TCPTransport) Dial ¶
func (t *TCPTransport) Dial(ctx context.Context, addr string) error
Dial opens an outbound TCP connection and registers the peer.
func (*TCPTransport) ListenAndAccept ¶
func (t *TCPTransport) ListenAndAccept(ctx context.Context) error
ListenAndAccept binds TCP and starts accepting connections in the background.
func (*TCPTransport) Metrics ¶
func (t *TCPTransport) Metrics() *TransportMetrics
Metrics returns transport counters.
func (*TCPTransport) Ready ¶
func (t *TCPTransport) Ready() bool
Ready reports whether the transport has a bound listener.
type Transport ¶
type Transport interface {
ListenAndAccept(ctx context.Context) error
Dial(ctx context.Context, address string) error
Addr() net.Addr
Close() error
Ready() bool
Metrics() *TransportMetrics
}
Transport listens for inbound peers and can dial outbound connections.
type TransportMetrics ¶
type TransportMetrics struct {
InboundAccepts atomic.Uint64
AcceptErrors atomic.Uint64
DialAttempts atomic.Uint64
DialSuccess atomic.Uint64
DialErrors atomic.Uint64
PeersRejected atomic.Uint64
ActivePeers atomic.Int64
FramesHandled atomic.Uint64
FrameHandlerErrs atomic.Uint64
}
TransportMetrics holds atomic counters for observability (logs, /metrics JSON, Prometheus adapters).
func NewTransportMetrics ¶
func NewTransportMetrics() *TransportMetrics
NewTransportMetrics returns a zeroed metrics struct.
func (*TransportMetrics) Snapshot ¶
func (m *TransportMetrics) Snapshot() map[string]int64
Snapshot returns a point-in-time copy suitable for JSON or logs.