Documentation
¶
Index ¶
- Constants
- type ATCBus
- type ATCEvent
- type ATCEventType
- type ATCNode
- type ATCServer
- type ATCSnapshot
- type Client
- func (c *Client) Connect() error
- func (c *Client) Disconnect() error
- func (c *Client) IsConnected() bool
- func (c *Client) Listen()
- func (c *Client) OnData(fn func(data []byte))
- func (c *Client) OnDisconnect(fn func())
- func (c *Client) OnReject(fn func(reason string))
- func (c *Client) OnStream(fn func(data []byte))
- func (c *Client) Send(data []byte) error
- func (c *Client) SendEncrypted(data []byte, key string) error
- func (c *Client) SendStream(data []byte) error
- func (c *Client) SetEncryptionKey(key string)
- type Config
- type Peer
- func (p *Peer) Broadcast(data []byte)
- func (p *Peer) BroadcastStream(data []byte)
- func (p *Peer) Close() error
- func (p *Peer) Connect(host string, port int) (string, error)
- func (p *Peer) ConnectTo(info PeerInfo) (string, error)
- func (p *Peer) Disconnect(peerID string) error
- func (p *Peer) DisconnectAll()
- func (p *Peer) Discover(callback func([]PeerInfo)) error
- func (p *Peer) DiscoverWithTimeout(callback func([]PeerInfo), timeout time.Duration) error
- func (p *Peer) GetPeerID() string
- func (p *Peer) GetPeers() []string
- func (p *Peer) IsConnected() bool
- func (p *Peer) Listen() error
- func (p *Peer) OnConnect(fn func(peerID string))
- func (p *Peer) OnData(fn func(peerID string, data []byte))
- func (p *Peer) OnDisconnect(fn func(peerID string))
- func (p *Peer) OnStream(fn func(peerID string, data []byte))
- func (p *Peer) PeerCount() int
- func (p *Peer) Send(data []byte) error
- func (p *Peer) SendRouted(destID string, data []byte) error
- func (p *Peer) SendRoutedTTL(destID string, data []byte, ttl int) error
- func (p *Peer) SendStream(peerID string, data []byte) error
- func (p *Peer) SendTo(peerID string, data []byte) error
- func (p *Peer) StartDiscoveryListener() error
- type PeerConfig
- type PeerInfo
- type Server
- func (s *Server) Broadcast(data []byte)
- func (s *Server) BroadcastEncrypted(data []byte, key string)
- func (s *Server) BroadcastExcept(excludeID string, data []byte)
- func (s *Server) BroadcastExceptEncrypted(excludeID string, data []byte, key string)
- func (s *Server) BroadcastStream(data []byte)
- func (s *Server) ClientCount() int
- func (s *Server) Close() error
- func (s *Server) Drain() int
- func (s *Server) GetClients() []string
- func (s *Server) Kick(clientID string) error
- func (s *Server) Listen() error
- func (s *Server) OnConnect(fn func(clientID string))
- func (s *Server) OnData(fn func(clientID string, data []byte))
- func (s *Server) OnDisconnect(fn func(clientID string))
- func (s *Server) OnStream(fn func(clientID string, data []byte))
- func (s *Server) SendStream(clientID string, data []byte) error
- func (s *Server) SendTo(clientID string, data []byte) error
- func (s *Server) SendToEncrypted(clientID string, data []byte, key string) error
Constants ¶
const ( DefaultPort = 5000 DefaultProtocol = "tcp" DefaultHost = "localhost" )
const (
// DefaultChunkSize is the default chunk body size (64 KB).
DefaultChunkSize = 64 * 1024
)
const (
// MaxMessageSize caps a single message at 16 MB.
MaxMessageSize = 16 * 1024 * 1024
)
Variables ¶
This section is empty.
Functions ¶
This section is empty.
Types ¶
type ATCBus ¶
type ATCBus struct {
// contains filtered or unexported fields
}
ATCBus fans out events to all subscribed dashboard connections.
type ATCEvent ¶
type ATCEvent struct {
Type ATCEventType `json:"type"`
Timestamp int64 `json:"ts"` // unix milliseconds
NodeID string `json:"node_id"` // the client/peer UUID involved
TargetID string `json:"target_id,omitempty"` // for send events: who it's going to
Bytes int `json:"bytes,omitempty"` // data size
Message string `json:"msg,omitempty"` // human-readable log line
Mode string `json:"mode"` // "server", "p2p"
}
ATCEvent is a single telemetry event streamed to the dashboard.
type ATCEventType ¶
type ATCEventType string
ATCEventType identifies what happened.
const ( ATCEventConnect ATCEventType = "connect" ATCEventDisconnect ATCEventType = "disconnect" ATCEventDataSend ATCEventType = "data_send" ATCEventDataRecv ATCEventType = "data_recv" ATCEventStreamSend ATCEventType = "stream_send" ATCEventStreamRecv ATCEventType = "stream_recv" ATCEventPing ATCEventType = "ping" ATCEventPong ATCEventType = "pong" ATCEventReject ATCEventType = "reject" ATCEventError ATCEventType = "error" ATCEventServerUp ATCEventType = "server_up" ATCEventDrain ATCEventType = "drain" ATCEventSnapshot ATCEventType = "snapshot" // full state snapshot for new dashboard connections )
type ATCNode ¶
type ATCNode struct {
ID string `json:"id"`
Label string `json:"label"` // short display label (first 8 chars of UUID)
Connected int64 `json:"connected"` // unix ms when connected
IP string `json:"ip,omitempty"`
}
ATCNode represents one connected client or peer in the snapshot.
type ATCServer ¶
type ATCServer struct {
// contains filtered or unexported fields
}
ATCServer is the HTTP server that serves the ATC dashboard and streams telemetry events via Server-Sent Events (SSE). No external dependencies.
Endpoints:
GET / - serves the ATC dashboard HTML GET /events - SSE stream of real-time events GET /snapshot - JSON snapshot of current state
type ATCSnapshot ¶
type ATCSnapshot struct {
Type ATCEventType `json:"type"`
Timestamp int64 `json:"ts"`
Mode string `json:"mode"`
ServerPort int `json:"server_port"`
Protocol string `json:"protocol"`
Nodes []ATCNode `json:"nodes"`
Encrypted bool `json:"encrypted"`
SelfID string `json:"self_id,omitempty"` // for P2P - our own peer ID
}
ATCSnapshot is a full state dump sent when a dashboard first connects.
type Client ¶
type Client struct {
// contains filtered or unexported fields
}
Client connects to a NetPipe server and exchanges raw data. Create one with NewClient(), register callbacks, Connect(), then go Listen().
func NewClient ¶
NewClient creates a client with the given config. Pass Config{} for all defaults (localhost:5000, TCP).
func (*Client) Connect ¶
Connect establishes the connection to the server. Works for both TCP and UDP - the protocol is set in Config.
func (*Client) Disconnect ¶
Disconnect closes the connection cleanly.
func (*Client) IsConnected ¶
IsConnected returns true if the client has an active connection.
func (*Client) Listen ¶
func (c *Client) Listen()
Listen runs a blocking read loop that fires OnData with complete messages. Designed to be run in a goroutine: go client.Listen() When the connection drops, fires OnDisconnect and returns (or auto-reconnects).
func (*Client) OnData ¶
OnData registers a callback fired when data arrives from the server. Data is raw bytes - the developer decides what they mean.
func (*Client) OnDisconnect ¶
func (c *Client) OnDisconnect(fn func())
OnDisconnect registers a callback fired when the server disconnects.
func (*Client) OnReject ¶
OnReject registers a callback fired when the server rejects the connection. Reason is a short string like "server full" or "too many connections from your IP".
func (*Client) OnStream ¶
OnStream registers a callback fired when a chunked stream transfer completes. Data is the fully reassembled payload.
func (*Client) Send ¶
Send writes raw bytes to the server. Returns an error if not connected. The message is framed automatically so it arrives as a complete unit.
func (*Client) SendEncrypted ¶
SendEncrypted writes AES-256-GCM encrypted bytes to the server. The key can be any string - it is hashed to 32 bytes internally.
func (*Client) SendStream ¶
SendStream sends large data as a chunked stream. The server receives it as a single complete payload via OnStream.
func (*Client) SetEncryptionKey ¶
SetEncryptionKey sets the key used to auto-decrypt incoming encrypted messages. Call before Listen().
type Config ¶
type Config struct {
Host string // client only - defaults to "localhost"
Port int // defaults to 5000
Protocol string // defaults to "tcp" (also supports "udp")
EncryptionKey string // optional - server uses this to auto-decrypt incoming encrypted messages
// Stage 6 - Streaming
ChunkSize int // stream chunk size in bytes (default 64 KB)
MaxStreamsPerClient int // server only - 0 = unlimited. max concurrent incomplete streams per client
// Stage 7 - Robustness
MaxClients int // server only - 0 = unlimited
HeartbeatInterval time.Duration // server only - 0 = disabled. e.g. 10 * time.Second
HeartbeatTimeout time.Duration // server only - 0 = 2x interval. time to wait for pong before disconnect
IdleTimeout time.Duration // server only - 0 = disabled. disconnect clients with no data activity
ConnectTimeout time.Duration // server only - 0 = 30s default. max time a new client can sit before sending data
MaxConnsPerIP int // server only - 0 = unlimited. max simultaneous connections from one IP
DrainTimeout time.Duration // server only - 0 = 30s default. max wait time for Drain() to complete
AutoReconnect bool // client only - auto-reconnect on disconnect
MaxReconnectAttempts int // client only - 0 = infinite retries
ReconnectInterval time.Duration // client only - defaults to 2s (base for exponential backoff)
// Addons
EnableATC bool // enable the ATC visual dashboard addon. defaults to false
ATCPort int // ATC dashboard HTTP port. defaults to 5001
}
Config controls server or client behaviour. Every field has a sensible default - pass Config{} and it works out of the box.
type Peer ¶
type Peer struct {
// contains filtered or unexported fields
}
Peer is a P2P node that connects directly to other peers without a central server.
Every connection performs an automatic Diffie-Hellman key exchange - all traffic is AES-256-GCM encrypted by default. The developer never handles keys.
In a mesh, each peer maintains its own registry of connected peers. Any peer can send to any other peer it is directly connected to.
Usage:
peer := netpipe.NewPeer(netpipe.PeerConfig{Port: 6000})
peer.OnData(func(peerID string, data []byte) { ... })
go peer.Listen() // accept incoming connections
peer.Connect("192.168.1.10", 6000) // connect to another peer
peer.Broadcast([]byte("hello mesh"))
func NewPeer ¶
func NewPeer(cfg PeerConfig) *Peer
NewPeer creates a P2P peer node.
peer := netpipe.NewPeer(netpipe.PeerConfig{Port: 6000})
func (*Peer) BroadcastStream ¶
BroadcastStream sends large data to all connected peers as a chunked stream.
func (*Peer) Connect ¶
Connect initiates a direct connection to another peer by address. Performs the DH handshake and adds the peer to the mesh.
func (*Peer) Disconnect ¶
Disconnect cleanly disconnects one specific peer.
func (*Peer) DisconnectAll ¶
func (p *Peer) DisconnectAll()
DisconnectAll disconnects from every peer in the mesh.
func (*Peer) Discover ¶
Discover broadcasts a discovery request on the local network and collects responses from other NetPipe PP peers. Blocks for up to timeout seconds, then calls the callback with all discovered peers.
peer.Discover(func(found []netpipe.PeerInfo) {
for _, p := range found {
peer.ConnectTo(p)
}
})
func (*Peer) DiscoverWithTimeout ¶
DiscoverWithTimeout is like Discover but with a custom timeout.
func (*Peer) IsConnected ¶
IsConnected returns true if there is at least one connected peer.
func (*Peer) Listen ¶
Listen starts accepting incoming peer connections. Blocks forever. Run in a goroutine: go peer.Listen()
func (*Peer) OnDisconnect ¶
OnDisconnect registers a callback fired when a peer leaves the mesh.
func (*Peer) OnStream ¶
OnStream registers a callback fired when a chunked stream completes from any peer.
func (*Peer) Send ¶
Send sends data to a single directly-connected peer (for 1:1 connections). If multiple peers are connected, use SendTo or Broadcast.
func (*Peer) SendRouted ¶
SendRouted sends data to a peer that may not be directly connected. The message is forwarded through intermediate peers using the route table. TTL limits the maximum number of hops (default 5).
func (*Peer) SendRoutedTTL ¶
SendRoutedTTL sends a routed message with a custom TTL.
func (*Peer) SendStream ¶
SendStream sends large data to a specific peer as a chunked stream.
func (*Peer) StartDiscoveryListener ¶
StartDiscoveryListener runs a background goroutine that responds to LAN discovery broadcasts from other peers. Call this on peers that want to be discoverable.
go peer.StartDiscoveryListener()
type PeerConfig ¶
type PeerConfig struct {
Port int // listening port - defaults to 6000
Protocol string // defaults to "tcp"
Name string // optional human-readable name (advertised during LAN discovery)
NoEncrypt bool // set to true to disable DH encryption (testing/trusted LAN only)
ChunkSize int // stream chunk size - defaults to 64 KB
// Addons
EnableATC bool // enable the ATC visual dashboard addon. defaults to false
ATCPort int // ATC dashboard HTTP port. defaults to 6001
}
PeerConfig controls P2P peer behaviour.
type PeerInfo ¶
type PeerInfo struct {
ID string `json:"id"`
Address string `json:"address"`
Port int `json:"port"`
Name string `json:"name"`
}
PeerInfo contains what each peer advertises during LAN discovery.
type Server ¶
type Server struct {
// contains filtered or unexported fields
}
Server manages client connections and routes data between them. Create one with NewServer(), register callbacks, then call Listen().
func NewServer ¶
NewServer creates a server with the given config. Pass Config{} for all defaults (port 5000, TCP).
func (*Server) BroadcastEncrypted ¶
BroadcastEncrypted sends AES-256-GCM encrypted data to every connected client.
func (*Server) BroadcastExcept ¶
BroadcastExcept sends data to every client except the one specified. Useful for relaying a sender's message to all other clients.
func (*Server) BroadcastExceptEncrypted ¶
BroadcastExceptEncrypted sends encrypted data to every client except one.
func (*Server) BroadcastStream ¶
BroadcastStream sends large data to every client as a chunked stream.
func (*Server) ClientCount ¶
ClientCount returns the number of currently connected clients.
func (*Server) Drain ¶
Drain stops accepting new connections but waits for existing clients to finish and disconnect gracefully. Blocks until all clients are gone or DrainTimeout expires (default 30s). Returns the number of clients that were still connected when the timeout hit.
func (*Server) GetClients ¶
GetClients returns a slice of all connected client UUIDs.
func (*Server) Kick ¶
Kick forcibly disconnects a specific client. For TCP: closes the connection, triggering the normal OnDisconnect path. For UDP: removes the client from the registry and fires OnDisconnect.
func (*Server) Listen ¶
Listen starts accepting client connections. Blocks forever. Works for both TCP and UDP - the protocol is set in Config.
func (*Server) OnConnect ¶
OnConnect registers a callback fired when a client connects. The clientID is the client's UUID.
func (*Server) OnData ¶
OnData registers a callback fired when a client sends data. Data is raw bytes - the developer decides what they mean.
func (*Server) OnDisconnect ¶
OnDisconnect registers a callback fired when a client disconnects. The client has already been removed from the registry at this point.
func (*Server) OnStream ¶
OnStream registers a callback fired when a chunked stream transfer completes. Data is the fully reassembled payload - the developer never sees individual chunks.
func (*Server) SendStream ¶
SendStream sends large data to a specific client as a chunked stream. The client receives it as a single complete payload via OnStream.