p2p

package
v0.0.1 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: May 24, 2022 License: Apache-2.0 Imports: 43 Imported by: 0

README

p2p

The p2p package provides an abstraction around peer-to-peer communication.

Docs:

  • Connection for details on how connections and multiplexing work
  • Peer for details on peer ID, handshakes, and peer exchange
  • Node for details about different types of nodes and how they should work
  • Pex for details on peer discovery and exchange
  • Config for details on some config option

Documentation

Index

Constants

View Source
const EmptyNetAddress = "<nil-NetAddress>"

EmptyNetAddress defines the string representation of an empty NetAddress

View Source
const (
	// MetricsSubsystem is a subsystem shared by all metrics exposed by this
	// package.
	MetricsSubsystem = "p2p"
)
View Source
const (
	// NodeIDByteLength is the length of a crypto.Address. Currently only 20.
	// FIXME: support other length addresses?
	NodeIDByteLength = crypto.AddressSize
)
View Source
const TestHost = "localhost"

Variables

This section is empty.

Functions

func AddPeerToSwitchPeerSet

func AddPeerToSwitchPeerSet(sw *Switch, peer Peer)

func Connect2Switches

func Connect2Switches(switches []*Switch, i, j int)

Connect2Switches will connect switches i and j via net.Pipe(). Blocks until a connection is established. NOTE: caller ensures i and j are within bounds.

func IDAddressString

func IDAddressString(id NodeID, protocolHostPort string) string

IDAddressString returns id@hostPort. It strips the leading protocol from protocolHostPort if it exists.

func MConnConfig

func MConnConfig(cfg *config.P2PConfig) conn.MConnConfig

MConnConfig returns an MConnConfig with fields updated from the P2PConfig.

func MaxNodeInfoSize

func MaxNodeInfoSize() int

Max size of the NodeInfo struct

func NetAddressesToProto

func NetAddressesToProto(nas []*NetAddress) []tmp2p.PexAddress

NetAddressesToProto converts a slice of NetAddresses into a Protobuf PexAddress slice. FIXME: Remove this when legacy PEX reactor is removed.

func StartSwitches

func StartSwitches(switches []*Switch) error

StartSwitches calls sw.Start() for each given switch. It returns the first encountered error.

Types

type AddrBook

type AddrBook interface {
	AddAddress(addr *NetAddress, src *NetAddress) error
	AddPrivateIDs([]string)
	AddOurAddress(*NetAddress)
	OurAddress(*NetAddress) bool
	MarkGood(NodeID)
	RemoveAddress(*NetAddress)
	HasAddress(*NetAddress) bool
	Save()
}

An AddrBook represents an address book from the pex package, which is used to store peer addresses.

type AddrBookMock

type AddrBookMock struct {
	Addrs        map[string]struct{}
	OurAddrs     map[string]struct{}
	PrivateAddrs map[string]struct{}
}

func (*AddrBookMock) AddAddress

func (book *AddrBookMock) AddAddress(addr *NetAddress, src *NetAddress) error

func (*AddrBookMock) AddOurAddress

func (book *AddrBookMock) AddOurAddress(addr *NetAddress)

func (*AddrBookMock) AddPrivateIDs

func (book *AddrBookMock) AddPrivateIDs(addrs []string)

func (*AddrBookMock) HasAddress

func (book *AddrBookMock) HasAddress(addr *NetAddress) bool

func (*AddrBookMock) MarkGood

func (book *AddrBookMock) MarkGood(NodeID)

func (*AddrBookMock) OurAddress

func (book *AddrBookMock) OurAddress(addr *NetAddress) bool

func (*AddrBookMock) RemoveAddress

func (book *AddrBookMock) RemoveAddress(addr *NetAddress)

func (*AddrBookMock) Save

func (book *AddrBookMock) Save()

type BaseReactor

type BaseReactor struct {
	service.BaseService // Provides Start, Stop, .Quit
	Switch              *Switch
}

func NewBaseReactor

func NewBaseReactor(name string, impl Reactor) *BaseReactor

func (*BaseReactor) AddPeer

func (*BaseReactor) AddPeer(peer Peer)

func (*BaseReactor) GetChannels

func (*BaseReactor) GetChannels() []*conn.ChannelDescriptor

func (*BaseReactor) InitPeer

func (*BaseReactor) InitPeer(peer Peer) Peer

func (*BaseReactor) Receive

func (*BaseReactor) Receive(chID byte, peer Peer, msgBytes []byte)

func (*BaseReactor) RemovePeer

func (*BaseReactor) RemovePeer(peer Peer, reason interface{})

func (*BaseReactor) SetSwitch

func (br *BaseReactor) SetSwitch(sw *Switch)

type Channel

type Channel struct {
	ID    ChannelID
	In    <-chan Envelope  // inbound messages (peers to reactors)
	Out   chan<- Envelope  // outbound messages (reactors to peers)
	Error chan<- PeerError // peer error reporting
	// contains filtered or unexported fields
}

Channel is a bidirectional channel to exchange Protobuf messages with peers, wrapped in Envelope to specify routing info (i.e. sender/receiver).

func NewChannel

func NewChannel(
	id ChannelID,
	messageType proto.Message,
	inCh <-chan Envelope,
	outCh chan<- Envelope,
	errCh chan<- PeerError,
) *Channel

NewChannel creates a new channel. It is primarily for internal and test use, reactors should use Router.OpenChannel().

func (*Channel) Close

func (c *Channel) Close()

Close closes the channel. Future sends on Out and Error will panic. The In channel remains open to avoid having to synchronize Router senders, which should use Done() to detect channel closure instead.

func (*Channel) Done

func (c *Channel) Done() <-chan struct{}

Done returns a channel that's closed when Channel.Close() is called.

type ChannelDescriptor

type ChannelDescriptor = conn.ChannelDescriptor

type ChannelDescriptorShim

type ChannelDescriptorShim struct {
	MsgType    proto.Message
	Descriptor *ChannelDescriptor
}

ChannelDescriptorShim defines a shim wrapper around a legacy p2p channel and the proto.Message the new p2p Channel is responsible for handling. A ChannelDescriptorShim is not contained in ReactorShim, but is rather used to construct a ReactorShim.

type ChannelID

type ChannelID uint16

ChannelID is an arbitrary channel ID.

type ChannelShim

type ChannelShim struct {
	Descriptor *ChannelDescriptor
	Channel    *Channel
	// contains filtered or unexported fields
}

ChannelShim defines a generic shim wrapper around a legacy p2p channel and the new p2p Channel. It also includes the raw bi-directional Go channels so we can proxy message delivery.

func NewChannelShim

func NewChannelShim(cds *ChannelDescriptorShim, buf uint) *ChannelShim

type ConnFilterFunc

type ConnFilterFunc func(ConnSet, net.Conn, []net.IP) error

ConnFilterFunc is a callback for connection filtering. If it returns an error, the connection is rejected. The set of existing connections is passed along with the new connection and all resolved IPs.

var ConnDuplicateIPFilter ConnFilterFunc = func(cs ConnSet, c net.Conn, ips []net.IP) error {
	for _, ip := range ips {
		if cs.HasIP(ip) {
			return ErrRejected{
				conn:        c,
				err:         fmt.Errorf("ip<%v> already connected", ip),
				isDuplicate: true,
			}
		}
	}
	return nil
}

ConnDuplicateIPFilter resolves and keeps all ips for an incoming connection and refuses new ones if they come from a known ip.

type ConnSet

type ConnSet interface {
	Has(net.Conn) bool
	HasIP(net.IP) bool
	Set(net.Conn, []net.IP)
	Remove(net.Conn)
	RemoveAddr(net.Addr)
}

ConnSet is a lookup table for connections and all their ips.

func NewConnSet

func NewConnSet() ConnSet

NewConnSet returns a ConnSet implementation.

type Connection

type Connection interface {
	// Handshake executes a node handshake with the remote peer. It must be
	// called immediately after the connection is established, and returns the
	// remote peer's node info and public key. The caller is responsible for
	// validation.
	//
	// FIXME: The handshake should really be the Router's responsibility, but
	// that requires the connection interface to be byte-oriented rather than
	// message-oriented (see comment above).
	Handshake(context.Context, NodeInfo, crypto.PrivKey) (NodeInfo, crypto.PubKey, error)

	// ReceiveMessage returns the next message received on the connection,
	// blocking until one is available. Returns io.EOF if closed.
	ReceiveMessage() (ChannelID, []byte, error)

	// SendMessage sends a message on the connection. Returns io.EOF if closed.
	//
	// FIXME: For compatibility with the legacy P2P stack, it returns an
	// additional boolean false if the message timed out waiting to be accepted
	// into the send buffer. This should be removed.
	SendMessage(ChannelID, []byte) (bool, error)

	// TrySendMessage is a non-blocking version of SendMessage that returns
	// immediately if the message buffer is full. It returns true if the message
	// was accepted.
	//
	// FIXME: This method is here for backwards-compatibility with the legacy
	// P2P stack and should be removed.
	TrySendMessage(ChannelID, []byte) (bool, error)

	// LocalEndpoint returns the local endpoint for the connection.
	LocalEndpoint() Endpoint

	// RemoteEndpoint returns the remote endpoint for the connection.
	RemoteEndpoint() Endpoint

	// Close closes the connection.
	Close() error

	// FlushClose flushes all pending sends and then closes the connection.
	//
	// FIXME: This only exists for backwards-compatibility with the current
	// MConnection implementation. There should really be a separate Flush()
	// method, but there is no easy way to synchronously flush pending data with
	// the current MConnection code.
	FlushClose() error

	// Status returns the current connection status.
	// FIXME: Only here for compatibility with the current Peer code.
	Status() conn.ConnectionStatus

	// Stringer is used to display the connection, e.g. in logs.
	//
	// Without this, the logger may use reflection to access and display
	// internal fields. These can be written to concurrently, which can trigger
	// the race detector or even cause a panic.
	fmt.Stringer
}

Connection represents an established connection between two endpoints.

FIXME: This is a temporary interface for backwards-compatibility with the current MConnection-protocol, which is message-oriented. It should be migrated to a byte-oriented multi-stream interface instead, which would allow e.g. adopting QUIC and making message framing, traffic scheduling, and node handshakes a Router concern shared across all transports. However, this requires MConnection protocol changes or a shim. For details, see: https://github.com/tendermint/spec/pull/227

FIXME: The interface is currently very broad in order to accommodate MConnection behavior that the legacy P2P stack relies on. It should be cleaned up when the legacy stack is removed.

type ConnectionStatus

type ConnectionStatus = conn.ConnectionStatus

type Endpoint

type Endpoint struct {
	// Protocol specifies the transport protocol.
	Protocol Protocol

	// IP is an IP address (v4 or v6) to connect to. If set, this defines the
	// endpoint as a networked endpoint.
	IP net.IP

	// Port is a network port (either TCP or UDP). If 0, a default port may be
	// used depending on the protocol.
	Port uint16

	// Path is an optional transport-specific path or identifier.
	Path string
}

Endpoint represents a transport connection endpoint, either local or remote.

Endpoints are not necessarily networked (see e.g. MemoryTransport) but all networked endpoints must use IP as the underlying transport protocol to allow e.g. IP address filtering. Either IP or Path (or both) must be set.

func (Endpoint) NodeAddress

func (e Endpoint) NodeAddress(nodeID NodeID) NodeAddress

NodeAddress converts the endpoint into a NodeAddress for the given node ID.

func (Endpoint) String

func (e Endpoint) String() string

String formats the endpoint as a URL string.

func (Endpoint) Validate

func (e Endpoint) Validate() error

Validate validates the endpoint.

type Envelope

type Envelope struct {
	From      NodeID        // sender (empty if outbound)
	To        NodeID        // receiver (empty if inbound)
	Broadcast bool          // send to all connected peers (ignores To)
	Message   proto.Message // message payload
	// contains filtered or unexported fields
}

Envelope contains a message with sender/receiver routing info.

type ErrCurrentlyDialingOrExistingAddress

type ErrCurrentlyDialingOrExistingAddress struct {
	Addr string
}

ErrCurrentlyDialingOrExistingAddress indicates that we're currently dialing this address or it belongs to an existing peer.

func (ErrCurrentlyDialingOrExistingAddress) Error

type ErrFilterTimeout

type ErrFilterTimeout struct{}

ErrFilterTimeout indicates that a filter operation timed out.

func (ErrFilterTimeout) Error

func (e ErrFilterTimeout) Error() string

type ErrNetAddressInvalid

type ErrNetAddressInvalid struct {
	Addr string
	Err  error
}

func (ErrNetAddressInvalid) Error

func (e ErrNetAddressInvalid) Error() string

type ErrNetAddressLookup

type ErrNetAddressLookup struct {
	Addr string
	Err  error
}

func (ErrNetAddressLookup) Error

func (e ErrNetAddressLookup) Error() string

type ErrNetAddressNoID

type ErrNetAddressNoID struct {
	Addr string
}

func (ErrNetAddressNoID) Error

func (e ErrNetAddressNoID) Error() string

type ErrRejected

type ErrRejected struct {
	// contains filtered or unexported fields
}

ErrRejected indicates that a Peer was rejected carrying additional information as to the reason.

func (ErrRejected) Addr

func (e ErrRejected) Addr() NetAddress

Addr returns the NetAddress for the rejected Peer.

func (ErrRejected) Error

func (e ErrRejected) Error() string

func (ErrRejected) IsAuthFailure

func (e ErrRejected) IsAuthFailure() bool

IsAuthFailure when Peer authentication was unsuccessful.

func (ErrRejected) IsDuplicate

func (e ErrRejected) IsDuplicate() bool

IsDuplicate when Peer ID or IP are present already.

func (ErrRejected) IsFiltered

func (e ErrRejected) IsFiltered() bool

IsFiltered when Peer ID or IP was filtered.

func (ErrRejected) IsIncompatible

func (e ErrRejected) IsIncompatible() bool

IsIncompatible when Peer NodeInfo is not compatible with our own.

func (ErrRejected) IsNodeInfoInvalid

func (e ErrRejected) IsNodeInfoInvalid() bool

IsNodeInfoInvalid when the sent NodeInfo is not valid.

func (ErrRejected) IsSelf

func (e ErrRejected) IsSelf() bool

IsSelf when Peer is our own node.

type ErrSwitchAuthenticationFailure

type ErrSwitchAuthenticationFailure struct {
	Dialed *NetAddress
	Got    NodeID
}

func (ErrSwitchAuthenticationFailure) Error

type ErrSwitchConnectToSelf

type ErrSwitchConnectToSelf struct {
	Addr *NetAddress
}

ErrSwitchConnectToSelf to be raised when trying to connect to itself.

func (ErrSwitchConnectToSelf) Error

func (e ErrSwitchConnectToSelf) Error() string

type ErrSwitchDuplicatePeerID

type ErrSwitchDuplicatePeerID struct {
	ID NodeID
}

ErrSwitchDuplicatePeerID to be raised when a peer is connecting with a known ID.

func (ErrSwitchDuplicatePeerID) Error

func (e ErrSwitchDuplicatePeerID) Error() string

type ErrSwitchDuplicatePeerIP

type ErrSwitchDuplicatePeerIP struct {
	IP net.IP
}

ErrSwitchDuplicatePeerIP to be raised whena a peer is connecting with a known IP.

func (ErrSwitchDuplicatePeerIP) Error

func (e ErrSwitchDuplicatePeerIP) Error() string

type ErrTransportClosed

type ErrTransportClosed struct{}

ErrTransportClosed is raised when the Transport has been closed.

func (ErrTransportClosed) Error

func (e ErrTransportClosed) Error() string

type IPeerSet

type IPeerSet interface {
	Has(key NodeID) bool
	HasIP(ip net.IP) bool
	Get(key NodeID) Peer
	List() []Peer
	Size() int
}

IPeerSet has a (immutable) subset of the methods of PeerSet.

type MConnTransport

type MConnTransport struct {
	// contains filtered or unexported fields
}

MConnTransport is a Transport implementation using the current multiplexed Tendermint protocol ("MConn").

func NewMConnTransport

func NewMConnTransport(
	logger log.Logger,
	mConnConfig conn.MConnConfig,
	channelDescs []*ChannelDescriptor,
	options MConnTransportOptions,
) *MConnTransport

NewMConnTransport sets up a new MConnection transport. This uses the proprietary Tendermint MConnection protocol, which is implemented as conn.MConnection.

func (*MConnTransport) Accept

func (m *MConnTransport) Accept() (Connection, error)

Accept implements Transport.

func (*MConnTransport) Close

func (m *MConnTransport) Close() error

Close implements Transport.

func (*MConnTransport) Dial

func (m *MConnTransport) Dial(ctx context.Context, endpoint Endpoint) (Connection, error)

Dial implements Transport.

func (*MConnTransport) Endpoints

func (m *MConnTransport) Endpoints() []Endpoint

Endpoints implements Transport.

func (*MConnTransport) Listen

func (m *MConnTransport) Listen(endpoint Endpoint) error

Listen asynchronously listens for inbound connections on the given endpoint. It must be called exactly once before calling Accept(), and the caller must call Close() to shut down the listener.

FIXME: Listen currently only supports listening on a single endpoint, it might be useful to support listening on multiple addresses (e.g. IPv4 and IPv6, or a private and public address) via multiple Listen() calls.

func (*MConnTransport) Protocols

func (m *MConnTransport) Protocols() []Protocol

Protocols implements Transport. We support tcp for backwards-compatibility.

func (*MConnTransport) String

func (m *MConnTransport) String() string

String implements Transport.

type MConnTransportOptions

type MConnTransportOptions struct {
	// MaxAcceptedConnections is the maximum number of simultaneous accepted
	// (incoming) connections. Beyond this, new connections will block until
	// a slot is free. 0 means unlimited.
	//
	// FIXME: We may want to replace this with connection accounting in the
	// Router, since it will need to do e.g. rate limiting and such as well.
	// But it might also make sense to have per-transport limits.
	MaxAcceptedConnections uint32
}

MConnTransportOptions sets options for MConnTransport.

type MemoryConnection

type MemoryConnection struct {
	// contains filtered or unexported fields
}

MemoryConnection is an in-memory connection between two transport endpoints.

func (*MemoryConnection) Close

func (c *MemoryConnection) Close() error

Close implements Connection.

func (*MemoryConnection) FlushClose

func (c *MemoryConnection) FlushClose() error

FlushClose implements Connection.

func (*MemoryConnection) Handshake

func (c *MemoryConnection) Handshake(
	ctx context.Context,
	nodeInfo NodeInfo,
	privKey crypto.PrivKey,
) (NodeInfo, crypto.PubKey, error)

Handshake implements Connection.

func (*MemoryConnection) LocalEndpoint

func (c *MemoryConnection) LocalEndpoint() Endpoint

LocalEndpoint implements Connection.

func (*MemoryConnection) ReceiveMessage

func (c *MemoryConnection) ReceiveMessage() (ChannelID, []byte, error)

ReceiveMessage implements Connection.

func (*MemoryConnection) RemoteEndpoint

func (c *MemoryConnection) RemoteEndpoint() Endpoint

RemoteEndpoint implements Connection.

func (*MemoryConnection) SendMessage

func (c *MemoryConnection) SendMessage(chID ChannelID, msg []byte) (bool, error)

SendMessage implements Connection.

func (*MemoryConnection) Status

Status implements Connection.

func (*MemoryConnection) String

func (c *MemoryConnection) String() string

String implements Connection.

func (*MemoryConnection) TrySendMessage

func (c *MemoryConnection) TrySendMessage(chID ChannelID, msg []byte) (bool, error)

TrySendMessage implements Connection.

type MemoryNetwork

type MemoryNetwork struct {
	// contains filtered or unexported fields
}

MemoryNetwork is an in-memory "network" that uses buffered Go channels to communicate between endpoints. It is primarily meant for testing.

Network endpoints are allocated via CreateTransport(), which takes a node ID, and the endpoint is then immediately accessible via the URL "memory:<nodeID>".

func NewMemoryNetwork

func NewMemoryNetwork(logger log.Logger) *MemoryNetwork

NewMemoryNetwork creates a new in-memory network.

func (*MemoryNetwork) CreateTransport

func (n *MemoryNetwork) CreateTransport(nodeID NodeID) *MemoryTransport

CreateTransport creates a new memory transport endpoint with the given node ID and immediately begins listening on the address "memory:<id>". It panics if the node ID is already in use (which is fine, since this is for tests).

func (*MemoryNetwork) GetTransport

func (n *MemoryNetwork) GetTransport(id NodeID) *MemoryTransport

GetTransport looks up a transport in the network, returning nil if not found.

func (*MemoryNetwork) RemoveTransport

func (n *MemoryNetwork) RemoveTransport(id NodeID)

RemoveTransport removes a transport from the network and closes it.

func (*MemoryNetwork) Size

func (n *MemoryNetwork) Size() int

Size returns the number of transports in the network.

type MemoryTransport

type MemoryTransport struct {
	// contains filtered or unexported fields
}

MemoryTransport is an in-memory transport that uses buffered Go channels to communicate between endpoints. It is primarily meant for testing.

New transports are allocated with MemoryNetwork.CreateTransport(). To contact a different endpoint, both transports must be in the same MemoryNetwork.

func (*MemoryTransport) Accept

func (t *MemoryTransport) Accept() (Connection, error)

Accept implements Transport.

func (*MemoryTransport) Close

func (t *MemoryTransport) Close() error

Close implements Transport.

func (*MemoryTransport) Dial

func (t *MemoryTransport) Dial(ctx context.Context, endpoint Endpoint) (Connection, error)

Dial implements Transport.

func (*MemoryTransport) Endpoints

func (t *MemoryTransport) Endpoints() []Endpoint

Endpoints implements Transport.

func (*MemoryTransport) Protocols

func (t *MemoryTransport) Protocols() []Protocol

Protocols implements Transport.

func (*MemoryTransport) String

func (t *MemoryTransport) String() string

String implements Transport.

type Metrics

type Metrics struct {
	// Number of peers.
	Peers metrics.Gauge
	// Number of bytes received from a given peer.
	PeerReceiveBytesTotal metrics.Counter
	// Number of bytes sent to a given peer.
	PeerSendBytesTotal metrics.Counter
	// Pending bytes to be sent to a given peer.
	PeerPendingSendBytes metrics.Gauge
	// Number of transactions submitted by each peer.
	NumTxs metrics.Gauge
}

Metrics contains metrics exposed by this package.

func NopMetrics

func NopMetrics() *Metrics

NopMetrics returns no-op Metrics.

func PrometheusMetrics

func PrometheusMetrics(namespace string, labelsAndValues ...string) *Metrics

PrometheusMetrics returns Metrics build using Prometheus client library. Optionally, labels can be provided along with their values ("foo", "fooValue").

type NetAddress

type NetAddress struct {
	ID   NodeID `json:"id"`
	IP   net.IP `json:"ip"`
	Port uint16 `json:"port"`
}

NetAddress defines information about a peer on the network including its ID, IP address, and port.

func CreateRoutableAddr

func CreateRoutableAddr() (addr string, netAddr *NetAddress)

func NetAddressFromProto

func NetAddressFromProto(pb tmp2p.PexAddress) (*NetAddress, error)

NetAddressFromProto converts a Protobuf PexAddress into a native struct. FIXME: Remove this when legacy PEX reactor is removed.

func NetAddressesFromProto

func NetAddressesFromProto(pbs []tmp2p.PexAddress) ([]*NetAddress, error)

NetAddressesFromProto converts a slice of Protobuf PexAddresses into a native slice. FIXME: Remove this when legacy PEX reactor is removed.

func NewNetAddress

func NewNetAddress(id NodeID, addr net.Addr) *NetAddress

NewNetAddress returns a new NetAddress using the provided TCP address. When testing, other net.Addr (except TCP) will result in using 0.0.0.0:0. When normal run, other net.Addr (except TCP) will panic. Panics if ID is invalid. TODO: socks proxies?

func NewNetAddressIPPort

func NewNetAddressIPPort(ip net.IP, port uint16) *NetAddress

NewNetAddressIPPort returns a new NetAddress using the provided IP and port number.

func NewNetAddressString

func NewNetAddressString(addr string) (*NetAddress, error)

NewNetAddressString returns a new NetAddress using the provided address in the form of "ID@IP:Port". Also resolves the host if host is not an IP. Errors are of type ErrNetAddressXxx where Xxx is in (NoID, Invalid, Lookup)

func NewNetAddressStrings

func NewNetAddressStrings(addrs []string) ([]*NetAddress, []error)

NewNetAddressStrings returns an array of NetAddress'es build using the provided strings.

func (*NetAddress) Dial

func (na *NetAddress) Dial() (net.Conn, error)

Dial calls net.Dial on the address.

func (*NetAddress) DialString

func (na *NetAddress) DialString() string

func (*NetAddress) DialTimeout

func (na *NetAddress) DialTimeout(timeout time.Duration) (net.Conn, error)

DialTimeout calls net.DialTimeout on the address.

func (*NetAddress) Endpoint

func (na *NetAddress) Endpoint() Endpoint

Endpoint converts the address to an MConnection endpoint.

func (*NetAddress) Equals

func (na *NetAddress) Equals(other interface{}) bool

Equals reports whether na and other are the same addresses, including their ID, IP, and Port.

func (*NetAddress) HasID

func (na *NetAddress) HasID() bool

HasID returns true if the address has an ID. NOTE: It does not check whether the ID is valid or not.

func (*NetAddress) Local

func (na *NetAddress) Local() bool

Local returns true if it is a local address.

func (*NetAddress) OnionCatTor

func (na *NetAddress) OnionCatTor() bool

func (*NetAddress) RFC1918

func (na *NetAddress) RFC1918() bool

func (*NetAddress) RFC3849

func (na *NetAddress) RFC3849() bool

func (*NetAddress) RFC3927

func (na *NetAddress) RFC3927() bool

func (*NetAddress) RFC3964

func (na *NetAddress) RFC3964() bool

func (*NetAddress) RFC4193

func (na *NetAddress) RFC4193() bool

func (*NetAddress) RFC4380

func (na *NetAddress) RFC4380() bool

func (*NetAddress) RFC4843

func (na *NetAddress) RFC4843() bool

func (*NetAddress) RFC4862

func (na *NetAddress) RFC4862() bool

func (*NetAddress) RFC6052

func (na *NetAddress) RFC6052() bool

func (*NetAddress) RFC6145

func (na *NetAddress) RFC6145() bool

func (*NetAddress) ReachabilityTo

func (na *NetAddress) ReachabilityTo(o *NetAddress) int

ReachabilityTo checks whenever o can be reached from na.

func (*NetAddress) Routable

func (na *NetAddress) Routable() bool

Routable returns true if the address is routable.

func (*NetAddress) Same

func (na *NetAddress) Same(other interface{}) bool

Same returns true is na has the same non-empty ID or DialString as other.

func (*NetAddress) String

func (na *NetAddress) String() string

String representation: <ID>@<IP>:<PORT>

func (*NetAddress) ToProto

func (na *NetAddress) ToProto() tmp2p.PexAddress

ToProto converts a NetAddress to a Protobuf PexAddress. FIXME: Remove this when legacy PEX reactor is removed.

func (*NetAddress) Valid

func (na *NetAddress) Valid() error

For IPv4 these are either a 0 or all bits set address. For IPv6 a zero address or one that matches the RFC3849 documentation address format.

type NodeAddress

type NodeAddress struct {
	NodeID   NodeID
	Protocol Protocol
	Hostname string
	Port     uint16
	Path     string
}

NodeAddress is a node address URL. It differs from a transport Endpoint in that it contains the node's ID, and that the address hostname may be resolved into multiple IP addresses (and thus multiple endpoints).

If the URL is opaque, i.e. of the form "scheme:opaque", then the opaque part is expected to contain a node ID.

func ParseNodeAddress

func ParseNodeAddress(urlString string) (NodeAddress, error)

ParseNodeAddress parses a node address URL into a NodeAddress, normalizing and validating it.

func (NodeAddress) Resolve

func (a NodeAddress) Resolve(ctx context.Context) ([]Endpoint, error)

Resolve resolves a NodeAddress into a set of Endpoints, by expanding out a DNS hostname to IP addresses.

func (NodeAddress) String

func (a NodeAddress) String() string

String formats the address as a URL string.

func (NodeAddress) Validate

func (a NodeAddress) Validate() error

Validate validates a NodeAddress.

type NodeID

type NodeID string

NodeID is a hex-encoded crypto.Address. It must be lowercased (for uniqueness) and of length 2*NodeIDByteLength.

func NewNodeID

func NewNodeID(nodeID string) (NodeID, error)

NewNodeID returns a lowercased (normalized) NodeID, or errors if the node ID is invalid.

func NodeIDFromPubKey

func NodeIDFromPubKey(pubKey crypto.PubKey) NodeID

NodeIDFromPubKey creates a node ID from a given PubKey address.

func (NodeID) Bytes

func (id NodeID) Bytes() ([]byte, error)

Bytes converts the node ID to its binary byte representation.

func (NodeID) Validate

func (id NodeID) Validate() error

Validate validates the NodeID.

type NodeInfo

type NodeInfo struct {
	ProtocolVersion ProtocolVersion `json:"protocol_version"`

	// Authenticate
	// TODO: replace with NetAddress
	NodeID     NodeID `json:"id"`          // authenticated identifier
	ListenAddr string `json:"listen_addr"` // accepting incoming

	// Check compatibility.
	// Channels are HexBytes so easier to read as JSON
	Network  string         `json:"network"`  // network/chain ID
	Version  string         `json:"version"`  // major.minor.revision
	Channels bytes.HexBytes `json:"channels"` // channels this node knows about

	// ASCIIText fields
	Moniker string        `json:"moniker"` // arbitrary moniker
	Other   NodeInfoOther `json:"other"`   // other application specific data
}

NodeInfo is the basic node information exchanged between two peers during the Tendermint P2P handshake.

func NodeInfoFromProto

func NodeInfoFromProto(pb *tmp2p.NodeInfo) (NodeInfo, error)

func (NodeInfo) CompatibleWith

func (info NodeInfo) CompatibleWith(other NodeInfo) error

CompatibleWith checks if two NodeInfo are compatible with each other. CONTRACT: two nodes are compatible if the Block version and network match and they have at least one channel in common.

func (NodeInfo) ID

func (info NodeInfo) ID() NodeID

ID returns the node's peer ID.

func (NodeInfo) NetAddress

func (info NodeInfo) NetAddress() (*NetAddress, error)

NetAddress returns a NetAddress derived from the NodeInfo - it includes the authenticated peer ID and the self-reported ListenAddr. Note that the ListenAddr is not authenticated and may not match that address actually dialed if its an outbound peer.

func (NodeInfo) ToProto

func (info NodeInfo) ToProto() *tmp2p.NodeInfo

func (NodeInfo) Validate

func (info NodeInfo) Validate() error

Validate checks the self-reported NodeInfo is safe. It returns an error if there are too many Channels, if there are any duplicate Channels, if the ListenAddr is malformed, or if the ListenAddr is a host name that can not be resolved to some IP. TODO: constraints for Moniker/Other? Or is that for the UI ? JAE: It needs to be done on the client, but to prevent ambiguous unicode characters, maybe it's worth sanitizing it here. In the future we might want to validate these, once we have a name-resolution system up. International clients could then use punycode (or we could use url-encoding), and we just need to be careful with how we handle that in our clients. (e.g. off by default).

type NodeInfoOther

type NodeInfoOther struct {
	TxIndex    string `json:"tx_index"`
	RPCAddress string `json:"rpc_address"`
}

NodeInfoOther is the misc. applcation specific data

type NodeKey

type NodeKey struct {
	// Canonical ID - hex-encoded pubkey's address (IDByteLength bytes)
	ID NodeID `json:"id"`
	// Private key
	PrivKey crypto.PrivKey `json:"priv_key"`
}

NodeKey is the persistent peer key. It contains the nodes private key for authentication.

func GenNodeKey

func GenNodeKey() NodeKey

GenNodeKey generates a new node key.

func LoadNodeKey

func LoadNodeKey(filePath string) (NodeKey, error)

LoadNodeKey loads NodeKey located in filePath.

func LoadOrGenNodeKey

func LoadOrGenNodeKey(filePath string) (NodeKey, error)

LoadOrGenNodeKey attempts to load the NodeKey from the given filePath. If the file does not exist, it generates and saves a new NodeKey.

func (NodeKey) PubKey

func (nodeKey NodeKey) PubKey() crypto.PubKey

PubKey returns the peer's PubKey

func (NodeKey) SaveAs

func (nodeKey NodeKey) SaveAs(filePath string) error

SaveAs persists the NodeKey to filePath.

type Peer

type Peer interface {
	service.Service
	FlushStop()

	ID() NodeID           // peer's cryptographic ID
	RemoteIP() net.IP     // remote IP of the connection
	RemoteAddr() net.Addr // remote address of the connection

	IsOutbound() bool   // did we dial the peer
	IsPersistent() bool // do we redial this peer when we disconnect

	CloseConn() error // close original connection

	NodeInfo() NodeInfo // peer's info
	Status() tmconn.ConnectionStatus
	SocketAddr() *NetAddress // actual address of the socket

	Send(byte, []byte) bool
	TrySend(byte, []byte) bool

	Set(string, interface{})
	Get(string) interface{}
}

Peer is an interface representing a peer connected on a reactor.

func CreateRandomPeer

func CreateRandomPeer(outbound bool) Peer

type PeerError

type PeerError struct {
	NodeID NodeID
	Err    error
}

PeerError is a peer error reported via Channel.Error.

FIXME: This currently just disconnects the peer, which is too simplistic. For example, some errors should be logged, some should cause disconnects, and some should ban the peer.

FIXME: This should probably be replaced by a more general PeerBehavior concept that can mark good and bad behavior and contributes to peer scoring. It should possibly also allow reactors to request explicit actions, e.g. disconnection or banning, in addition to doing this based on aggregates.

type PeerFilterFunc

type PeerFilterFunc func(IPeerSet, Peer) error

PeerFilterFunc to be implemented by filter hooks after a new Peer has been fully setup.

type PeerManager

type PeerManager struct {
	// contains filtered or unexported fields
}

PeerManager manages peer lifecycle information, using a peerStore for underlying storage. Its primary purpose is to determine which peer to connect to next (including retry timers), make sure a peer only has a single active connection (either inbound or outbound), and evict peers to make room for higher-scored peers. It does not manage actual connections (this is handled by the Router), only the peer lifecycle state.

For an outbound connection, the flow is as follows:

  • DialNext: return a peer address to dial, mark peer as dialing.
  • DialFailed: report a dial failure, unmark as dialing.
  • Dialed: report a dial success, unmark as dialing and mark as connected (errors if already connected, e.g. by Accepted).
  • Ready: report routing is ready, mark as ready and broadcast PeerStatusUp.
  • Disconnected: report peer disconnect, unmark as connected and broadcasts PeerStatusDown.

For an inbound connection, the flow is as follows:

  • Accepted: report inbound connection success, mark as connected (errors if already connected, e.g. by Dialed).
  • Ready: report routing is ready, mark as ready and broadcast PeerStatusUp.
  • Disconnected: report peer disconnect, unmark as connected and broadcasts PeerStatusDown.

When evicting peers, either because peers are explicitly scheduled for eviction or we are connected to too many peers, the flow is as follows:

  • EvictNext: if marked evict and connected, unmark evict and mark evicting. If beyond MaxConnected, pick lowest-scored peer and mark evicting.
  • Disconnected: unmark connected, evicting, evict, and broadcast a PeerStatusDown peer update.

If all connection slots are full (at MaxConnections), we can use up to MaxConnectionsUpgrade additional connections to probe any higher-scored unconnected peers, and if we reach them (or they reach us) we allow the connection and evict a lower-scored peer. We mark the lower-scored peer as upgrading[from]=to to make sure no other higher-scored peers can claim the same one for an upgrade. The flow is as follows:

  • Accepted: if upgrade is possible, mark connected and add lower-scored to evict.
  • DialNext: if upgrade is possible, mark upgrading[from]=to and dialing.
  • DialFailed: unmark upgrading[from]=to and dialing.
  • Dialed: unmark upgrading[from]=to and dialing, mark as connected, add lower-scored to evict.
  • EvictNext: pick peer from evict, mark as evicting.
  • Disconnected: unmark connected, upgrading[from]=to, evict, evicting.

FIXME: The old stack supports ABCI-based peer ID filtering via /p2p/filter/id/<ID> queries, we should implement this here as well by taking a peer ID filtering callback in PeerManagerOptions and configuring it during Node setup.

func NewPeerManager

func NewPeerManager(selfID NodeID, peerDB dbm.DB, options PeerManagerOptions) (*PeerManager, error)

NewPeerManager creates a new peer manager.

func (*PeerManager) Accepted

func (m *PeerManager) Accepted(peerID NodeID) error

Accepted marks an incoming peer connection successfully accepted. If the peer is already connected or we don't allow additional connections then this will return an error.

If full but MaxConnectedUpgrade is non-zero and the incoming peer is better-scored than any existing peers, then we accept it and evict a lower-scored peer.

NOTE: We can't take an address here, since e.g. TCP uses a different port number for outbound traffic than inbound traffic, so the peer's endpoint wouldn't necessarily be an appropriate address to dial.

FIXME: When we accept a connection from a peer, we should register that peer's address in the peer store so that we can dial it later. In order to do that, we'll need to get the remote address after all, but as noted above that can't be the remote endpoint since that will usually have the wrong port number.

func (*PeerManager) Add

func (m *PeerManager) Add(address NodeAddress) error

Add adds a peer to the manager, given as an address. If the peer already exists, the address is added to it if not already present.

func (*PeerManager) Addresses

func (m *PeerManager) Addresses(peerID NodeID) []NodeAddress

Addresses returns all known addresses for a peer, primarily for testing. The order is arbitrary.

func (*PeerManager) Advertise

func (m *PeerManager) Advertise(peerID NodeID, limit uint16) []NodeAddress

Advertise returns a list of peer addresses to advertise to a peer.

FIXME: This is fairly naïve and only returns the addresses of the highest-ranked peers.

func (*PeerManager) Close

func (m *PeerManager) Close()

Close closes the peer manager, releasing resources (i.e. goroutines).

func (*PeerManager) DialFailed

func (m *PeerManager) DialFailed(address NodeAddress) error

DialFailed reports a failed dial attempt. This will make the peer available for dialing again when appropriate (possibly after a retry timeout).

FIXME: This should probably delete or mark bad addresses/peers after some time.

func (*PeerManager) DialNext

func (m *PeerManager) DialNext(ctx context.Context) (NodeAddress, error)

DialNext finds an appropriate peer address to dial, and marks it as dialing. If no peer is found, or all connection slots are full, it blocks until one becomes available. The caller must call Dialed() or DialFailed() for the returned peer.

func (*PeerManager) Dialed

func (m *PeerManager) Dialed(address NodeAddress) error

Dialed marks a peer as successfully dialed. Any further connections will be rejected, and once disconnected the peer may be dialed again.

func (*PeerManager) Disconnected

func (m *PeerManager) Disconnected(peerID NodeID) error

Disconnected unmarks a peer as connected, allowing it to be dialed or accepted again as appropriate.

func (*PeerManager) Errored

func (m *PeerManager) Errored(peerID NodeID, err error) error

Errored reports a peer error, causing the peer to be evicted if it's currently connected.

FIXME: This should probably be replaced with a peer behavior API, see PeerError comments for more details.

FIXME: This will cause the peer manager to immediately try to reconnect to the peer, which is probably not always what we want.

func (*PeerManager) EvictNext

func (m *PeerManager) EvictNext(ctx context.Context) (NodeID, error)

EvictNext returns the next peer to evict (i.e. disconnect). If no evictable peers are found, the call will block until one becomes available.

func (*PeerManager) GetHeight

func (m *PeerManager) GetHeight(peerID NodeID) int64

GetHeight returns a peer's height, as reported via SetHeight, or 0 if the peer or height is unknown.

FIXME: This is a temporary workaround to share state between the consensus and mempool reactors, carried over from the legacy P2P stack. Reactors should not have dependencies on each other, instead tracking this themselves.

func (*PeerManager) Peers

func (m *PeerManager) Peers() []NodeID

Peers returns all known peers, primarily for testing. The order is arbitrary.

func (*PeerManager) Ready

func (m *PeerManager) Ready(peerID NodeID) error

Ready marks a peer as ready, broadcasting status updates to subscribers. The peer must already be marked as connected. This is separate from Dialed() and Accepted() to allow the router to set up its internal queues before reactors start sending messages.

func (*PeerManager) Scores

func (m *PeerManager) Scores() map[NodeID]PeerScore

Scores returns the peer scores for all known peers, primarily for testing.

func (*PeerManager) SetHeight

func (m *PeerManager) SetHeight(peerID NodeID, height int64) error

SetHeight stores a peer's height, making it available via GetHeight.

FIXME: This is a temporary workaround to share state between the consensus and mempool reactors, carried over from the legacy P2P stack. Reactors should not have dependencies on each other, instead tracking this themselves.

func (*PeerManager) Status

func (m *PeerManager) Status(id NodeID) PeerStatus

Status returns the status for a peer, primarily for testing.

func (*PeerManager) Subscribe

func (m *PeerManager) Subscribe() *PeerUpdates

Subscribe subscribes to peer updates. The caller must consume the peer updates in a timely fashion and close the subscription when done, otherwise the PeerManager will halt.

func (*PeerManager) TryDialNext

func (m *PeerManager) TryDialNext() (NodeAddress, error)

TryDialNext is equivalent to DialNext(), but immediately returns an empty address if no peers or connection slots are available.

func (*PeerManager) TryEvictNext

func (m *PeerManager) TryEvictNext() (NodeID, error)

TryEvictNext is equivalent to EvictNext, but immediately returns an empty node ID if no evictable peers are found.

type PeerManagerOptions

type PeerManagerOptions struct {
	// PersistentPeers are peers that we want to maintain persistent connections
	// to. These will be scored higher than other peers, and if
	// MaxConnectedUpgrade is non-zero any lower-scored peers will be evicted if
	// necessary to make room for these.
	PersistentPeers []NodeID

	// MaxPeers is the maximum number of peers to track information about, i.e.
	// store in the peer store. When exceeded, the lowest-scored unconnected peers
	// will be deleted. 0 means no limit.
	MaxPeers uint16

	// MaxConnected is the maximum number of connected peers (inbound and
	// outbound). 0 means no limit.
	MaxConnected uint16

	// MaxConnectedUpgrade is the maximum number of additional connections to
	// use for probing any better-scored peers to upgrade to when all connection
	// slots are full. 0 disables peer upgrading.
	//
	// For example, if we are already connected to MaxConnected peers, but we
	// know or learn about better-scored peers (e.g. configured persistent
	// peers) that we are not connected too, then we can probe these peers by
	// using up to MaxConnectedUpgrade connections, and once connected evict the
	// lowest-scored connected peers. This also works for inbound connections,
	// i.e. if a higher-scored peer attempts to connect to us, we can accept
	// the connection and evict a lower-scored peer.
	MaxConnectedUpgrade uint16

	// MinRetryTime is the minimum time to wait between retries. Retry times
	// double for each retry, up to MaxRetryTime. 0 disables retries.
	MinRetryTime time.Duration

	// MaxRetryTime is the maximum time to wait between retries. 0 means
	// no maximum, in which case the retry time will keep doubling.
	MaxRetryTime time.Duration

	// MaxRetryTimePersistent is the maximum time to wait between retries for
	// peers listed in PersistentPeers. 0 uses MaxRetryTime instead.
	MaxRetryTimePersistent time.Duration

	// RetryTimeJitter is the upper bound of a random interval added to
	// retry times, to avoid thundering herds. 0 disables jitter.
	RetryTimeJitter time.Duration

	// PeerScores sets fixed scores for specific peers. It is mainly used
	// for testing. A score of 0 is ignored.
	PeerScores map[NodeID]PeerScore
	// contains filtered or unexported fields
}

PeerManagerOptions specifies options for a PeerManager.

func (*PeerManagerOptions) Validate

func (o *PeerManagerOptions) Validate() error

Validate validates the options.

type PeerOption

type PeerOption func(*peer)

func PeerMetrics

func PeerMetrics(metrics *Metrics) PeerOption

type PeerScore

type PeerScore uint8

PeerScore is a numeric score assigned to a peer (higher is better).

const (
	PeerScorePersistent PeerScore = 100 // persistent peers
)

type PeerSet

type PeerSet struct {
	// contains filtered or unexported fields
}

PeerSet is a special structure for keeping a table of peers. Iteration over the peers is super fast and thread-safe.

func NewPeerSet

func NewPeerSet() *PeerSet

NewPeerSet creates a new peerSet with a list of initial capacity of 256 items.

func (*PeerSet) Add

func (ps *PeerSet) Add(peer Peer) error

Add adds the peer to the PeerSet. It returns an error carrying the reason, if the peer is already present.

func (*PeerSet) Get

func (ps *PeerSet) Get(peerKey NodeID) Peer

Get looks up a peer by the provided peerKey. Returns nil if peer is not found.

func (*PeerSet) Has

func (ps *PeerSet) Has(peerKey NodeID) bool

Has returns true if the set contains the peer referred to by this peerKey, otherwise false.

func (*PeerSet) HasIP

func (ps *PeerSet) HasIP(peerIP net.IP) bool

HasIP returns true if the set contains the peer referred to by this IP address, otherwise false.

func (*PeerSet) List

func (ps *PeerSet) List() []Peer

List returns the threadsafe list of peers.

func (*PeerSet) Remove

func (ps *PeerSet) Remove(peer Peer) bool

Remove discards peer by its Key, if the peer was previously memoized. Returns true if the peer was removed, and false if it was not found. in the set.

func (*PeerSet) Size

func (ps *PeerSet) Size() int

Size returns the number of unique items in the peerSet.

type PeerStatus

type PeerStatus string

PeerStatus is a peer status.

The peer manager has many more internal states for a peer (e.g. dialing, connected, evicting, and so on), which are tracked separately. PeerStatus is for external use outside of the peer manager.

const (
	PeerStatusUp   PeerStatus = "up"   // connected and ready
	PeerStatusDown PeerStatus = "down" // disconnected
)

type PeerUpdate

type PeerUpdate struct {
	NodeID NodeID
	Status PeerStatus
}

PeerUpdate is a peer update event sent via PeerUpdates.

type PeerUpdates

type PeerUpdates struct {
	// contains filtered or unexported fields
}

PeerUpdates is a peer update subscription with notifications about peer events (currently just status changes).

func NewPeerUpdates

func NewPeerUpdates(updatesCh chan PeerUpdate) *PeerUpdates

NewPeerUpdates creates a new PeerUpdates subscription. It is primarily for internal use, callers should typically use PeerManager.Subscribe(). The subscriber must call Close() when done.

func (*PeerUpdates) Close

func (pu *PeerUpdates) Close()

Close closes the peer updates subscription.

func (*PeerUpdates) Done

func (pu *PeerUpdates) Done() <-chan struct{}

Done returns a channel that is closed when the subscription is closed.

func (*PeerUpdates) Updates

func (pu *PeerUpdates) Updates() <-chan PeerUpdate

Updates returns a channel for consuming peer updates.

type Protocol

type Protocol string

Protocol identifies a transport protocol.

const (
	MConnProtocol Protocol = "mconn"
	TCPProtocol   Protocol = "tcp"
)
const (
	MemoryProtocol Protocol = "memory"
)

type ProtocolVersion

type ProtocolVersion struct {
	P2P   uint64 `json:"p2p"`
	Block uint64 `json:"block"`
	App   uint64 `json:"app"`
}

ProtocolVersion contains the protocol versions for the software.

func NewProtocolVersion

func NewProtocolVersion(p2p, block, app uint64) ProtocolVersion

NewProtocolVersion returns a fully populated ProtocolVersion.

type Reactor

type Reactor interface {
	service.Service // Start, Stop

	// SetSwitch allows setting a switch.
	SetSwitch(*Switch)

	// GetChannels returns the list of MConnection.ChannelDescriptor. Make sure
	// that each ID is unique across all the reactors added to the switch.
	GetChannels() []*conn.ChannelDescriptor

	// InitPeer is called by the switch before the peer is started. Use it to
	// initialize data for the peer (e.g. peer state).
	//
	// NOTE: The switch won't call AddPeer nor RemovePeer if it fails to start
	// the peer. Do not store any data associated with the peer in the reactor
	// itself unless you don't want to have a state, which is never cleaned up.
	InitPeer(peer Peer) Peer

	// AddPeer is called by the switch after the peer is added and successfully
	// started. Use it to start goroutines communicating with the peer.
	AddPeer(peer Peer)

	// RemovePeer is called by the switch when the peer is stopped (due to error
	// or other reason).
	RemovePeer(peer Peer, reason interface{})

	// Receive is called by the switch when msgBytes is received from the peer.
	//
	// NOTE reactor can not keep msgBytes around after Receive completes without
	// copying.
	//
	// CONTRACT: msgBytes are not nil.
	//
	// XXX: do not call any methods that can block or incur heavy processing.
	// https://github.com/tendermint/tendermint/issues/2888
	Receive(chID byte, peer Peer, msgBytes []byte)
}

Reactor is responsible for handling incoming messages on one or more Channel. Switch calls GetChannels when reactor is added to it. When a new peer joins our node, InitPeer and AddPeer are called. RemovePeer is called when the peer is stopped. Receive is called when a message is received on a channel associated with this reactor.

Peer#Send or Peer#TrySend should be used to send the message to a peer.

type ReactorShim

type ReactorShim struct {
	BaseReactor

	Name        string
	PeerUpdates *PeerUpdates
	Channels    map[ChannelID]*ChannelShim
}

ReactorShim defines a generic shim wrapper around a BaseReactor. It is responsible for wiring up legacy p2p behavior to the new p2p semantics (e.g. proxying Envelope messages to legacy peers).

func NewReactorShim

func NewReactorShim(logger log.Logger, name string, descriptors map[ChannelID]*ChannelDescriptorShim) *ReactorShim

func (*ReactorShim) AddPeer

func (rs *ReactorShim) AddPeer(peer Peer)

AddPeer sends a PeerUpdate with status PeerStatusUp on the PeerUpdateCh. The embedding reactor must be sure to listen for messages on this channel to handle adding a peer.

func (*ReactorShim) GetChannel

func (rs *ReactorShim) GetChannel(cID ChannelID) *Channel

GetChannel returns a p2p Channel reference for a given ChannelID. If no Channel exists, nil is returned.

func (*ReactorShim) GetChannels

func (rs *ReactorShim) GetChannels() []*ChannelDescriptor

GetChannels implements the legacy Reactor interface for getting a slice of all the supported ChannelDescriptors.

func (*ReactorShim) OnStart

func (rs *ReactorShim) OnStart() error

OnStart executes the reactor shim's OnStart hook where we start all the necessary go-routines in order to proxy peer envelopes and errors per p2p Channel.

func (*ReactorShim) Receive

func (rs *ReactorShim) Receive(chID byte, src Peer, msgBytes []byte)

Receive implements a generic wrapper around implementing the Receive method on the legacy Reactor p2p interface. If the reactor is running, Receive will find the corresponding new p2p Channel, create and decode the appropriate proto.Message from the msgBytes, execute any validation and finally construct and send a p2p Envelope on the appropriate p2p Channel.

func (*ReactorShim) RemovePeer

func (rs *ReactorShim) RemovePeer(peer Peer, reason interface{})

RemovePeer sends a PeerUpdate with status PeerStatusDown on the PeerUpdateCh. The embedding reactor must be sure to listen for messages on this channel to handle removing a peer.

type Router

type Router struct {
	*service.BaseService
	// contains filtered or unexported fields
}

Router manages peer connections and routes messages between peers and reactor channels. It takes a PeerManager for peer lifecycle management (e.g. which peers to dial and when) and a set of Transports for connecting and communicating with peers.

On startup, three main goroutines are spawned to maintain peer connections:

dialPeers(): in a loop, calls PeerManager.DialNext() to get the next peer
address to dial and spawns a goroutine that dials the peer, handshakes
with it, and begins to route messages if successful.

acceptPeers(): in a loop, waits for an inbound connection via
Transport.Accept() and spawns a goroutine that handshakes with it and
begins to route messages if successful.

evictPeers(): in a loop, calls PeerManager.EvictNext() to get the next
peer to evict, and disconnects it by closing its message queue.

When a peer is connected, an outbound peer message queue is registered in peerQueues, and routePeer() is called to spawn off two additional goroutines:

sendPeer(): waits for an outbound message from the peerQueues queue,
marshals it, and passes it to the peer transport which delivers it.

receivePeer(): waits for an inbound message from the peer transport,
unmarshals it, and passes it to the appropriate inbound channel queue
in channelQueues.

When a reactor opens a channel via OpenChannel, an inbound channel message queue is registered in channelQueues, and a channel goroutine is spawned:

routeChannel(): waits for an outbound message from the channel, looks
up the recipient peer's outbound message queue in peerQueues, and submits
the message to it.

All channel sends in the router are blocking. It is the responsibility of the queue interface in peerQueues and channelQueues to prioritize and drop messages as appropriate during contention to prevent stalls and ensure good quality of service.

func NewRouter

func NewRouter(
	logger log.Logger,
	nodeInfo NodeInfo,
	privKey crypto.PrivKey,
	peerManager *PeerManager,
	transports []Transport,
	options RouterOptions,
) (*Router, error)

NewRouter creates a new Router. The given Transports must already be listening on appropriate interfaces, and will be closed by the Router when it stops.

func (*Router) OnStart

func (r *Router) OnStart() error

OnStart implements service.Service.

func (*Router) OnStop

func (r *Router) OnStop()

OnStop implements service.Service.

All channels must be closed by OpenChannel() callers before stopping the router, to prevent blocked channel sends in reactors. Channels are not closed here, since that would cause any reactor senders to panic, so it is the sender's responsibility.

func (*Router) OpenChannel

func (r *Router) OpenChannel(id ChannelID, messageType proto.Message, size int) (*Channel, error)

OpenChannel opens a new channel for the given message type. The caller must close the channel when done, before stopping the Router. messageType is the type of message passed through the channel (used for unmarshaling), which can implement Wrapper to automatically (un)wrap multiple message types in a wrapper message. The caller may provide a size to make the channel buffered, which internally makes the inbound, outbound, and error channel buffered.

type RouterOptions

type RouterOptions struct {
	// ResolveTimeout is the timeout for resolving NodeAddress URLs.
	// 0 means no timeout.
	ResolveTimeout time.Duration

	// DialTimeout is the timeout for dialing a peer. 0 means no timeout.
	DialTimeout time.Duration

	// HandshakeTimeout is the timeout for handshaking with a peer. 0 means
	// no timeout.
	HandshakeTimeout time.Duration
}

RouterOptions specifies options for a Router.

func (*RouterOptions) Validate

func (o *RouterOptions) Validate() error

Validate validates router options.

type Switch

type Switch struct {
	service.BaseService
	// contains filtered or unexported fields
}

Switch handles peer connections and exposes an API to receive incoming messages on `Reactors`. Each `Reactor` is responsible for handling incoming messages of one or more `Channels`. So while sending outgoing messages is typically performed on the peer, incoming messages are received on the reactor.

func MakeConnectedSwitches

func MakeConnectedSwitches(cfg *config.P2PConfig,
	n int,
	initSwitch func(int, *Switch) *Switch,
	connect func([]*Switch, int, int),
) []*Switch

MakeConnectedSwitches returns n switches, connected according to the connect func. If connect==Connect2Switches, the switches will be fully connected. initSwitch defines how the i'th switch should be initialized (ie. with what reactors). NOTE: panics if any switch fails to start.

func MakeSwitch

func MakeSwitch(
	cfg *config.P2PConfig,
	i int,
	network, version string,
	initSwitch func(int, *Switch) *Switch,
	logger log.Logger,
	opts ...SwitchOption,
) *Switch

func NewSwitch

func NewSwitch(
	cfg *config.P2PConfig,
	transport Transport,
	options ...SwitchOption,
) *Switch

NewSwitch creates a new Switch with the given config.

func (*Switch) AddPersistentPeers

func (sw *Switch) AddPersistentPeers(addrs []string) error

AddPersistentPeers allows you to set persistent peers. It ignores ErrNetAddressLookup. However, if there are other errors, first encounter is returned.

func (*Switch) AddPrivatePeerIDs

func (sw *Switch) AddPrivatePeerIDs(ids []string) error

func (*Switch) AddReactor

func (sw *Switch) AddReactor(name string, reactor Reactor) Reactor

AddReactor adds the given reactor to the switch. NOTE: Not goroutine safe.

func (*Switch) AddUnconditionalPeerIDs

func (sw *Switch) AddUnconditionalPeerIDs(ids []string) error

func (*Switch) Broadcast

func (sw *Switch) Broadcast(chID byte, msgBytes []byte) chan bool

Broadcast runs a go routine for each attempted send, which will block trying to send for defaultSendTimeoutSeconds. Returns a channel which receives success values for each attempted send (false if times out). Channel will be closed once msg bytes are sent to all peers (or time out).

NOTE: Broadcast uses goroutines, so order of broadcast may not be preserved.

func (*Switch) DialPeerWithAddress

func (sw *Switch) DialPeerWithAddress(addr *NetAddress) error

DialPeerWithAddress dials the given peer and runs sw.addPeer if it connects and authenticates successfully. If we're currently dialing this address or it belongs to an existing peer, ErrCurrentlyDialingOrExistingAddress is returned.

func (*Switch) DialPeersAsync

func (sw *Switch) DialPeersAsync(peers []string) error

DialPeersAsync dials a list of peers asynchronously in random order. Used to dial peers from config on startup or from unsafe-RPC (trusted sources). It ignores ErrNetAddressLookup. However, if there are other errors, first encounter is returned. Nop if there are no peers.

func (*Switch) IsDialingOrExistingAddress

func (sw *Switch) IsDialingOrExistingAddress(addr *NetAddress) bool

IsDialingOrExistingAddress returns true if switch has a peer with the given address or dialing it at the moment.

func (*Switch) IsPeerPersistent

func (sw *Switch) IsPeerPersistent(na *NetAddress) bool

func (*Switch) IsPeerUnconditional

func (sw *Switch) IsPeerUnconditional(id NodeID) bool

func (*Switch) MarkPeerAsGood

func (sw *Switch) MarkPeerAsGood(peer Peer)

MarkPeerAsGood marks the given peer as good when it did something useful like contributed to consensus.

func (*Switch) MaxNumOutboundPeers

func (sw *Switch) MaxNumOutboundPeers() int

MaxNumOutboundPeers returns a maximum number of outbound peers.

func (*Switch) NetAddress

func (sw *Switch) NetAddress() *NetAddress

NetAddress returns the first address the switch is listening on, or nil if no addresses are found.

func (*Switch) NodeInfo

func (sw *Switch) NodeInfo() NodeInfo

NodeInfo returns the switch's NodeInfo. NOTE: Not goroutine safe.

func (*Switch) NumPeers

func (sw *Switch) NumPeers() (outbound, inbound, dialing int)

NumPeers returns the count of outbound/inbound and outbound-dialing peers. unconditional peers are not counted here.

func (*Switch) OnStart

func (sw *Switch) OnStart() error

OnStart implements BaseService. It starts all the reactors and peers.

func (*Switch) OnStop

func (sw *Switch) OnStop()

OnStop implements BaseService. It stops all peers and reactors.

func (*Switch) Peers

func (sw *Switch) Peers() IPeerSet

Peers returns the set of peers that are connected to the switch.

func (*Switch) PutChannelDescsIntoTransport

func (sw *Switch) PutChannelDescsIntoTransport()

FIXME: Eww, needed to wire up the new P2P stack along with the old one. This should be passed into the transport when it's constructed.

func (*Switch) Reactor

func (sw *Switch) Reactor(name string) Reactor

Reactor returns the reactor with the given name. NOTE: Not goroutine safe.

func (*Switch) Reactors

func (sw *Switch) Reactors() map[string]Reactor

Reactors returns a map of reactors registered on the switch. NOTE: Not goroutine safe.

func (*Switch) RemoveReactor

func (sw *Switch) RemoveReactor(name string, reactor Reactor)

RemoveReactor removes the given Reactor from the Switch. NOTE: Not goroutine safe.

func (*Switch) SetAddrBook

func (sw *Switch) SetAddrBook(addrBook AddrBook)

SetAddrBook allows to set address book on Switch.

func (*Switch) SetNodeInfo

func (sw *Switch) SetNodeInfo(nodeInfo NodeInfo)

SetNodeInfo sets the switch's NodeInfo for checking compatibility and handshaking with other nodes. NOTE: Not goroutine safe.

func (*Switch) SetNodeKey

func (sw *Switch) SetNodeKey(nodeKey NodeKey)

SetNodeKey sets the switch's private key for authenticated encryption. NOTE: Not goroutine safe.

func (*Switch) StopPeerForError

func (sw *Switch) StopPeerForError(peer Peer, reason interface{})

StopPeerForError disconnects from a peer due to external error. If the peer is persistent, it will attempt to reconnect. TODO: make record depending on reason.

func (*Switch) StopPeerGracefully

func (sw *Switch) StopPeerGracefully(peer Peer)

StopPeerGracefully disconnects from a peer gracefully. TODO: handle graceful disconnects.

type SwitchOption

type SwitchOption func(*Switch)

SwitchOption sets an optional parameter on the Switch.

func SwitchConnFilters

func SwitchConnFilters(filters ...ConnFilterFunc) SwitchOption

SwitchConnFilters sets the filters for rejection of connections.

func SwitchFilterTimeout

func SwitchFilterTimeout(timeout time.Duration) SwitchOption

SwitchFilterTimeout sets the timeout used for peer filters.

func SwitchPeerFilters

func SwitchPeerFilters(filters ...PeerFilterFunc) SwitchOption

SwitchPeerFilters sets the filters for rejection of new peers.

func WithMetrics

func WithMetrics(metrics *Metrics) SwitchOption

WithMetrics sets the metrics.

type Transport

type Transport interface {
	// Protocols returns the protocols supported by the transport. The Router
	// uses this to pick a transport for an Endpoint.
	Protocols() []Protocol

	// Endpoints returns the local endpoints the transport is listening on, if any.
	//
	// How to listen is transport-dependent, e.g. MConnTransport uses Listen() while
	// MemoryTransport starts listening via MemoryNetwork.CreateTransport().
	Endpoints() []Endpoint

	// Accept waits for the next inbound connection on a listening endpoint, blocking
	// until either a connection is available or the transport is closed. On closure,
	// io.EOF is returned and further Accept calls are futile.
	Accept() (Connection, error)

	// Dial creates an outbound connection to an endpoint.
	Dial(context.Context, Endpoint) (Connection, error)

	// Close stops accepting new connections, but does not close active connections.
	Close() error

	// Stringer is used to display the transport, e.g. in logs.
	//
	// Without this, the logger may use reflection to access and display
	// internal fields. These can be written to concurrently, which can trigger
	// the race detector or even cause a panic.
	fmt.Stringer
}

Transport is a connection-oriented mechanism for exchanging data with a peer.

type Wrapper

type Wrapper interface {
	proto.Message

	// Wrap will take a message and wrap it in this one if possible.
	Wrap(proto.Message) error

	// Unwrap will unwrap the inner message contained in this message.
	Unwrap() (proto.Message, error)
}

Wrapper is a Protobuf message that can contain a variety of inner messages (e.g. via oneof fields). If a Channel's message type implements Wrapper, the Router will automatically wrap outbound messages and unwrap inbound messages, such that reactors do not have to do this themselves.

Directories

Path Synopsis
Taken from taipei-torrent.
Taken from taipei-torrent.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL