p2p

package
v0.0.0-...-3af30db Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Aug 4, 2025 License: MIT Imports: 12 Imported by: 0

Documentation

Overview

Package p2p provides peer and transport abstractions for StreamHive.

TCP transport supports context-aware listen/dial, optional TLS, optional length-prefixed frames (SHV1), metrics, connection limits, and hooks for connect/disconnect.

Index

Constants

View Source
const HandshakeVersionV1 = "streamhive/1"

HandshakeVersionV1 is the initial handshake string carried in the first application frame.

Variables

View Source
var (
	// FrameMagic identifies StreamHive frames on the wire.
	FrameMagic = []byte("SHV1")

	// DefaultMaxFrameBytes caps a single frame payload (DoS bound).
	DefaultMaxFrameBytes = 4 << 20
)

Wire format: Magic (4) + length (4, big-endian) + payload.

View Source
var (
	// ErrBadMagic means the stream does not start with FrameMagic.
	ErrBadMagic = errors.New("p2p: bad frame magic")
	// ErrFrameTooLarge means the declared length exceeds the configured maximum.
	ErrFrameTooLarge = errors.New("p2p: frame too large")
)
View Source
var ErrAddrRequired = errors.New("p2p: listen address is required")

ErrAddrRequired is returned when ListenAddress is empty.

View Source
var ErrAlreadyListening = errors.New("p2p: already listening")

ErrAlreadyListening is returned when ListenAndAccept is called more than once.

View Source
var ErrTransportClosed = errors.New("p2p: transport closed")

ErrTransportClosed is returned when Dial is used after Close.

Functions

func ReadFrame

func ReadFrame(r *bufio.Reader, maxPayload int) ([]byte, error)

ReadFrame reads one frame using r. maxPayload bounds the declared length.

func WriteFrame

func WriteFrame(w io.Writer, payload []byte, maxPayload int) error

WriteFrame writes one length-prefixed frame. Payload must be <= maxPayload.

Types

type Peer

type Peer interface {
	RemoteAddr() net.Addr
	Close() error
	IsOutbound() bool
}

Peer is a remote endpoint connected over the network.

type TCPPeer

type TCPPeer struct {
	// contains filtered or unexported fields
}

TCPPeer is a TCP-backed Peer.

func NewTCPPeer

func NewTCPPeer(conn net.Conn, outbound bool) *TCPPeer

NewTCPPeer wraps a connection as a Peer.

func (*TCPPeer) Close

func (p *TCPPeer) Close() error

Close closes the connection.

func (*TCPPeer) Conn

func (p *TCPPeer) Conn() net.Conn

Conn returns the underlying connection for protocol codecs.

func (*TCPPeer) IsOutbound

func (p *TCPPeer) IsOutbound() bool

IsOutbound reports whether this peer was created from a dial (outbound).

func (*TCPPeer) RemoteAddr

func (p *TCPPeer) RemoteAddr() net.Addr

RemoteAddr returns the remote network address.

type TCPTransport

type TCPTransport struct {
	ListenAddress      string
	Listener           net.Listener
	OnPeer             func(Peer)
	OnPeerDisconnected func(Peer)
	// FrameHandler, if set, reads length-prefixed frames until error or handler error.
	FrameHandler func(ctx context.Context, peer Peer, payload []byte) error
	Logger       *slog.Logger

	// MaxPeers limits simultaneous peers (0 = unlimited).
	MaxPeers int
	// DialTimeout bounds each Dial when ctx has no earlier deadline (0 = only ctx).
	DialTimeout time.Duration
	// ReadIdleTimeout sets read deadlines on framed or discard read loops (0 = none).
	ReadIdleTimeout time.Duration
	// MaxFrameBytes caps ReadFrame payload size when using FrameHandler (0 = DefaultMaxFrameBytes).
	MaxFrameBytes int

	TLSServerConfig *tls.Config
	TLSClientConfig *tls.Config
	// contains filtered or unexported fields
}

TCPTransport listens on TCP and tracks connected peers.

func NewTCPTransport

func NewTCPTransport(listenAddr string) *TCPTransport

NewTCPTransport constructs a transport; ListenAddress must be non-empty before ListenAndAccept.

func (*TCPTransport) Addr

func (t *TCPTransport) Addr() net.Addr

Addr returns the bound listen address, or nil if not listening.

func (*TCPTransport) Close

func (t *TCPTransport) Close() error

Close shuts down the listener, waits for the accept loop, and closes peers.

func (*TCPTransport) Dial

func (t *TCPTransport) Dial(ctx context.Context, addr string) error

Dial opens an outbound TCP connection and registers the peer.

func (*TCPTransport) ListenAndAccept

func (t *TCPTransport) ListenAndAccept(ctx context.Context) error

ListenAndAccept binds TCP and starts accepting connections in the background.

func (*TCPTransport) Metrics

func (t *TCPTransport) Metrics() *TransportMetrics

Metrics returns transport counters.

func (*TCPTransport) Ready

func (t *TCPTransport) Ready() bool

Ready reports whether the transport has a bound listener.

type Transport

type Transport interface {
	ListenAndAccept(ctx context.Context) error
	Dial(ctx context.Context, address string) error
	Addr() net.Addr
	Close() error
	Ready() bool
	Metrics() *TransportMetrics
}

Transport listens for inbound peers and can dial outbound connections.

type TransportMetrics

type TransportMetrics struct {
	InboundAccepts   atomic.Uint64
	AcceptErrors     atomic.Uint64
	DialAttempts     atomic.Uint64
	DialSuccess      atomic.Uint64
	DialErrors       atomic.Uint64
	PeersRejected    atomic.Uint64
	ActivePeers      atomic.Int64
	FramesHandled    atomic.Uint64
	FrameHandlerErrs atomic.Uint64
}

TransportMetrics holds atomic counters for observability (logs, /metrics JSON, Prometheus adapters).

func NewTransportMetrics

func NewTransportMetrics() *TransportMetrics

NewTransportMetrics returns a zeroed metrics struct.

func (*TransportMetrics) Snapshot

func (m *TransportMetrics) Snapshot() map[string]int64

Snapshot returns a point-in-time copy suitable for JSON or logs.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL