p2pv2

package
v0.0.0-...-2d66582 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jan 10, 2020 License: Apache-2.0 Imports: 40 Imported by: 0

Documentation

Overview

Package p2pv2 is the v2 of XuperChain p2p network.

Index

Constants

View Source
const (
	DefaultStrategy           FilterStrategy = "DefaultStrategy"
	BucketsStrategy                          = "BucketsStrategy"
	NearestBucketStrategy                    = "NearestBucketStrategy"
	BucketsWithFactorStrategy                = "BucketsWithFactorStrategy"
	CorePeersStrategy                        = "CorePeersStrategy"
)

supported filter strategies

View Source
const (
	MsgChanSize         = 50000
	MsgHandledCacheSize = 50000
)

define default message config

View Source
const (
	XuperProtocolID    = "/xuper/2.0.0" // protocol version
	P2PMultiAddrPrefix = "p2pMulti_"
)

define the common config

Variables

View Source
var (
	ErrSubscribe       = errors.New("subscribe error")
	ErrAlreadyRegisted = errors.New("subscriber already registered")
	ErrUnregister      = errors.New("unregister subscriber error")
)

define errors

View Source
var (
	// MaxBroadCastPeers define the maximum number of common peers to broadcast messages
	MaxBroadCastPeers = 20
	// MaxBroadCastCorePeers define the maximum number of core peers to broadcast messages
	MaxBroadCastCorePeers = 10
)
View Source
var (
	ErrGenerateOpts     = errors.New("generate host opts error")
	ErrCreateHost       = errors.New("create host error")
	ErrCreateKadDht     = errors.New("create kad dht error")
	ErrCreateStreamPool = errors.New("create stream pool error")
	ErrCreateBootStrap  = errors.New("create bootstrap error pool error")
	ErrConnectBootStrap = errors.New("error to connect to all bootstrap")
	ErrConnectCorePeers = errors.New("error to connect to all core peers")
	ErrInvalidParams    = errors.New("invalid params")
)

define errors

View Source
var (
	ErrValidateConfig   = errors.New("config not valid")
	ErrCreateNode       = errors.New("create node error")
	ErrCreateHandlerMap = errors.New("create handlerMap error")
)

define errors

View Source
var (
	ErrTimeout     = errors.New("request time out")
	ErrNullResult  = errors.New("request result is null")
	ErrStrNotValid = errors.New("stream not valid")
)

define common errors

View Source
var (
	ErrStreamNotFound = errors.New("stream not found")
	ErrStreamPoolFull = errors.New("stream pool is full")
	ErrAddStream      = errors.New("error to add stream")
	ErrRequest        = errors.New("error request from network")
	ErrAuth           = errors.New("error invalid auth request")
)

define common errors

Functions

func GenerateKeyPairWithPath

func GenerateKeyPairWithPath(path string) error

GenerateKeyPairWithPath generate xuper net key pair

func GenerateUniqueRandList

func GenerateUniqueRandList(size int, max int) []int

GenerateUniqueRandList get a random unique number list

func GetAuthRequest

func GetAuthRequest(v *XchainAddrInfo) (*pb.IdentityAuth, error)

GetAuthRequest get auth request for authentication

func GetIDFromAddr

func GetIDFromAddr(peerAddr string) (peer.ID, error)

GetIDFromAddr return peer ID corresponding to peerAddr

func GetKeyPairFromPath

func GetKeyPairFromPath(path string) (crypto.PrivKey, error)

GetKeyPairFromPath get xuper net key from file path

func GetPeerIDFromPath

func GetPeerIDFromPath(keypath string) (string, error)

GetPeerIDFromPath return peer id of given private key path

Types

type BucketsFilter

type BucketsFilter struct {
	// contains filtered or unexported fields
}

BucketsFilter define filter that get all peers in buckets

func (*BucketsFilter) Filter

func (bf *BucketsFilter) Filter() ([]peer.ID, error)

Filter 依据Bucket分层广播

type BucketsFilterWithFactor

type BucketsFilterWithFactor struct {
	// contains filtered or unexported fields
}

BucketsFilterWithFactor define filter that get a certain percentage peers in each bucket

func (*BucketsFilterWithFactor) Filter

func (nf *BucketsFilterWithFactor) Filter() ([]peer.ID, error)

Filter 从每个Bucket中挑选占比Factor个peers进行广播 对于每一个Bucket,平均分成若干块,每个块抽取若干个节点

*|<---------------- Bucket ---------------->|
*--------------------------------------------
*|        |        |        |        |      |
*--------------------------------------------
*       split1   split2    split3   split4 split5

type CorePeersFilter

type CorePeersFilter struct {
	// contains filtered or unexported fields
}

CorePeersFilter define filter for core peers

func (*CorePeersFilter) Filter

func (cp *CorePeersFilter) Filter() ([]peer.ID, error)

Filter select MaxBroadCastCorePeers random peers from core peers, half from current and half from next

func (*CorePeersFilter) SetRouteName

func (cp *CorePeersFilter) SetRouteName(name string)

SetRouteName set the core route name to filter in XuperChain, the route name is the blockchain name

type CorePeersInfo

type CorePeersInfo struct {
	Name           string   // distinguished name of the node routing
	CurrentTermNum int64    // the current term number
	CurrentPeerIDs []string // current core peer IDs
	NextPeerIDs    []string // upcoming core peer IDs
}

CorePeersInfo defines the peers' info for core nodes By setting this info, we can keep some core peers always connected directly It's useful for keeping DPoS key network security and for some BFT-like consensus

type FilterStrategy

type FilterStrategy string

FilterStrategy defines the supported filter strategies

type HandlerMap

type HandlerMap struct {
	// contains filtered or unexported fields
}

HandlerMap the message handler manager keeps the message and handler mapping and recently handled messages

func NewHandlerMap

func NewHandlerMap(log log.Logger) (*HandlerMap, error)

NewHandlerMap create instance of HandlerMap

func (*HandlerMap) HandleMessage

func (hm *HandlerMap) HandleMessage(s *Stream, msg *xuperp2p.XuperMessage) error

HandleMessage handle new messages with registered handlers

func (*HandlerMap) IsMsgAsHandled

func (hm *HandlerMap) IsMsgAsHandled(msg *xuperp2p.XuperMessage) bool

IsMsgAsHandled used to check whether the msg has been dealt with.

func (*HandlerMap) MarkMsgAsHandled

func (hm *HandlerMap) MarkMsgAsHandled(msg *xuperp2p.XuperMessage)

MarkMsgAsHandled used to mark message has been dealt with.

func (*HandlerMap) Register

func (hm *HandlerMap) Register(sub *Subscriber) (*Subscriber, error)

Register used to register subscriber to handlerMap.

func (*HandlerMap) Start

func (hm *HandlerMap) Start()

Start start message handling

func (*HandlerMap) Stop

func (hm *HandlerMap) Stop()

Stop stop message handling

func (*HandlerMap) UnRegister

func (hm *HandlerMap) UnRegister(sub *Subscriber) error

UnRegister used to un register subscriber from handlerMap.

type MessageOption

type MessageOption func(*msgOptions)

MessageOption define single option function

func WithBcName

func WithBcName(bcname string) MessageOption

WithBcName add bcname to message option

func WithCompress

func WithCompress(compress bool) MessageOption

WithCompress set compredded to message option

func WithFilters

func WithFilters(filter []FilterStrategy) MessageOption

WithFilters add filter strategies to message option

func WithPercentage

func WithPercentage(percentage float32) MessageOption

WithPercentage add percentage to message option

func WithTargetPeerAddrs

func WithTargetPeerAddrs(peerAddrs []string) MessageOption

WithTargetPeerAddrs add target peer addresses to message option

func WithTargetPeerIDs

func WithTargetPeerIDs(pid []string) MessageOption

WithTargetPeerIDs add target peer IDs to message option

type MockP2pServer

type MockP2pServer struct {
}

MockP2pServer is mock struct of P2PServer interface Used in unit tests

func (*MockP2pServer) GetNetURL

func (mp *MockP2pServer) GetNetURL() string

GetNetURL implements the GetNetURL interface

func (*MockP2pServer) GetPeerUrls

func (mp *MockP2pServer) GetPeerUrls() []string

GetPeerUrls implements the GetPeerUrls interface

func (*MockP2pServer) Register

func (mp *MockP2pServer) Register(sub *Subscriber) (*Subscriber, error)

Register implements the Register interface

func (*MockP2pServer) SendMessage

func (mp *MockP2pServer) SendMessage(context context.Context, msg *p2pPb.XuperMessage, opts ...MessageOption) error

SendMessage implements the SendMessage interface

func (*MockP2pServer) SendMessageWithResponse

func (mp *MockP2pServer) SendMessageWithResponse(context.Context,
	*p2pPb.XuperMessage, ...MessageOption) ([]*p2pPb.XuperMessage, error)

SendMessageWithResponse implements the Register interface

func (*MockP2pServer) SetCorePeers

func (mp *MockP2pServer) SetCorePeers(corePeers *CorePeersInfo) error

SetCorePeers implements the SetCorePeers interface

func (*MockP2pServer) SetXchainAddr

func (mp *MockP2pServer) SetXchainAddr(bcname string, info *XchainAddrInfo)

SetXchainAddr implements the SetXchainAddr interface

func (*MockP2pServer) Start

func (mp *MockP2pServer) Start()

Start implements the start interface

func (*MockP2pServer) Stop

func (mp *MockP2pServer) Stop()

Stop implements the Stop interface

func (*MockP2pServer) UnRegister

func (mp *MockP2pServer) UnRegister(sub *Subscriber) error

UnRegister implements the UnRegister interface

type MultiStrategy

type MultiStrategy struct {
	// contains filtered or unexported fields
}

MultiStrategy a peer filter that contains multiple filters

func NewMultiStrategy

func NewMultiStrategy(node *Node, filters []PeersFilter, extraPeers []peer.ID) *MultiStrategy

NewMultiStrategy create instance of MultiStrategy

func (*MultiStrategy) Filter

func (cp *MultiStrategy) Filter() ([]peer.ID, error)

Filter return peer IDs with multiple filters

type MultiSubscriber

type MultiSubscriber struct {
	// contains filtered or unexported fields
}

MultiSubscriber wrap a list of Subscriber of same message

type NearestBucketFilter

type NearestBucketFilter struct {
	// contains filtered or unexported fields
}

NearestBucketFilter define filter that get nearest peers from a specified peer ID

func (*NearestBucketFilter) Filter

func (nf *NearestBucketFilter) Filter() ([]peer.ID, error)

Filter 广播给最近的Bucket

type Node

type Node struct {
	// contains filtered or unexported fields
}

Node is the node in the network

func NewNode

func NewNode(cfg config.P2PConfig, log log.Logger) (*Node, error)

NewNode define the node of the xuper, it will set streamHandler for this node.

func (*Node) ConnectToPeersByAddr

func (no *Node) ConnectToPeersByAddr(addrs []string) int

ConnectToPeersByAddr provide connection support using peer address(netURL)

func (*Node) Context

func (no *Node) Context() context.Context

Context return the node context

func (*Node) GetP2PMultiAddrPrefix

func (no *Node) GetP2PMultiAddrPrefix() string

GetP2PMultiAddrPrefix return P2PMultiAddrPrefix

func (*Node) ListPeers

func (no *Node) ListPeers() []peer.ID

ListPeers return the list of peer ID in routing table

func (*Node) NodeID

func (no *Node) NodeID() peer.ID

NodeID return the node ID

func (*Node) SendMessage

func (no *Node) SendMessage(ctx context.Context, msg *p2pPb.XuperMessage, peers []peer.ID) error

SendMessage send message to given peers

func (*Node) SendMessageWithResponse

func (no *Node) SendMessageWithResponse(ctx context.Context, msg *p2pPb.XuperMessage, peers []peer.ID, percentage float32) ([]*p2pPb.XuperMessage, error)

SendMessageWithResponse send message to given peers, expecting response from peers

func (*Node) SetServer

func (no *Node) SetServer(srv *P2PServerV2)

SetServer set the p2p server of the node

func (*Node) Start

func (no *Node) Start()

Start start the node

func (*Node) Stop

func (no *Node) Stop()

Stop stop the node

func (*Node) UpdateCorePeers

func (no *Node) UpdateCorePeers(cp *CorePeersInfo) error

UpdateCorePeers update core peers' info and keep connection to core peers

type P2PServer

type P2PServer interface {
	Start()
	Stop()

	// 注册订阅者,支持多个用户订阅同一类消息
	Register(sub *Subscriber) (*Subscriber, error)
	// 注销订阅者,需要根据当时注册时返回的Subscriber实例删除
	UnRegister(sub *Subscriber) error

	SendMessage(context.Context, *p2pPb.XuperMessage, ...MessageOption) error

	SendMessageWithResponse(context.Context, *p2pPb.XuperMessage, ...MessageOption) ([]*p2pPb.XuperMessage, error)

	GetNetURL() string
	// 查询所连接节点的信息
	GetPeerUrls() []string

	// SetCorePeers set core peers' info to P2P server
	SetCorePeers(cp *CorePeersInfo) error

	// SetXchainAddr Set xchain address from xchaincore
	SetXchainAddr(bcname string, info *XchainAddrInfo)
}

P2PServer is the p2p server interface of Xuper

type P2PServerV2

type P2PServerV2 struct {
	// contains filtered or unexported fields
}

P2PServerV2 is the v2 of XuperChain p2p server. An implement of P2PServer interface.

func NewP2PServerV2

func NewP2PServerV2(cfg config.P2PConfig, lg log.Logger) (*P2PServerV2, error)

NewP2PServerV2 create P2PServerV2 instance

func (*P2PServerV2) GetNetURL

func (p *P2PServerV2) GetNetURL() string

GetNetURL return net url of the xuper node url = /ip4/127.0.0.1/tcp/<port>/p2p/<peer.Id>

func (*P2PServerV2) GetPeerUrls

func (p *P2PServerV2) GetPeerUrls() []string

GetPeerUrls 查询所连接节点的信息

func (*P2PServerV2) Register

func (p *P2PServerV2) Register(sub *Subscriber) (*Subscriber, error)

Register register message subscribers to handle messages

func (*P2PServerV2) SendMessage

func (p *P2PServerV2) SendMessage(ctx context.Context, msg *p2pPb.XuperMessage,
	opts ...MessageOption) error

SendMessage send message to peers using given filter strategy

func (*P2PServerV2) SendMessageWithResponse

func (p *P2PServerV2) SendMessageWithResponse(ctx context.Context, msg *p2pPb.XuperMessage,
	opts ...MessageOption) ([]*p2pPb.XuperMessage, error)

SendMessageWithResponse send message to peers using given filter strategy, expect response from peers 客户端再使用该方法请求带返回的消息时,最好带上log_id, 否则会导致收消息时收到不匹配的消息而影响后续的处理

func (*P2PServerV2) SetCorePeers

func (p *P2PServerV2) SetCorePeers(cp *CorePeersInfo) error

SetCorePeers set core peers' info to P2P server

func (*P2PServerV2) SetXchainAddr

func (p *P2PServerV2) SetXchainAddr(bcname string, info *XchainAddrInfo)

SetXchainAddr Set xchain address info from core

func (*P2PServerV2) Start

func (p *P2PServerV2) Start()

Start start P2P server V2

func (*P2PServerV2) Stop

func (p *P2PServerV2) Stop()

Stop stop P2P server V2

func (*P2PServerV2) UnRegister

func (p *P2PServerV2) UnRegister(sub *Subscriber) error

UnRegister remove message subscribers

type PeersFilter

type PeersFilter interface {
	Filter() ([]peer.ID, error)
}

PeersFilter the interface for filter peers

type StaticNodeStrategy

type StaticNodeStrategy struct {
	// contains filtered or unexported fields
}

StaticNodeStrategy a peer filter that contains strategy nodes

func (*StaticNodeStrategy) Filter

func (ss *StaticNodeStrategy) Filter() ([]peer.ID, error)

Filter return static nodes peers

type Stream

type Stream struct {
	// contains filtered or unexported fields
}

Stream is the IO wrapper for underly P2P connection

func NewStream

func NewStream(s net.Stream, no *Node) *Stream

NewStream create Stream instance

func (*Stream) Authenticate

func (s *Stream) Authenticate() error

Authenticate it's used for identity authentication

func (*Stream) Close

func (s *Stream) Close()

Close close the connected IO stream

func (*Stream) PeerID

func (s *Stream) PeerID() string

PeerID get peerID

func (*Stream) SendMessage

func (s *Stream) SendMessage(ctx context.Context, msg *p2pPb.XuperMessage) error

SendMessage will send a message to a peer

func (*Stream) SendMessageWithResponse

func (s *Stream) SendMessageWithResponse(ctx context.Context, msg *p2pPb.XuperMessage) (*p2pPb.XuperMessage, error)

SendMessageWithResponse will send a message to a peer and wait for response

func (*Stream) Start

func (s *Stream) Start()

Start used to start

type StreamLimit

type StreamLimit struct {
	// contains filtered or unexported fields
}

StreamLimit limit the peerID amount of same ip

func (*StreamLimit) AddStream

func (sl *StreamLimit) AddStream(addrStr string, peerID peer.ID) bool

AddStream used to add the amount of same ip, plus one per call

func (*StreamLimit) DelStream

func (sl *StreamLimit) DelStream(addrStr string)

DelStream used to dec the amount of same ip, dec one per call

func (*StreamLimit) GetStreams

func (sl *StreamLimit) GetStreams() []string

GetStreams get all NetURLs from effective streams

func (*StreamLimit) Init

func (sl *StreamLimit) Init(limit int64, lg log.Logger)

Init initialize the StreamLimit

type StreamPool

type StreamPool struct {
	// contains filtered or unexported fields
}

StreamPool manage all the stream

func NewStreamPool

func NewStreamPool(maxStreamLimit int32, no *Node, log log.Logger) (*StreamPool, error)

NewStreamPool create StreamPool instance

func (*StreamPool) Add

func (sp *StreamPool) Add(s net.Stream) *Stream

Add used to add a new net stream into pool

func (*StreamPool) AddStream

func (sp *StreamPool) AddStream(stream *Stream) error

AddStream used to add a new P2P stream into pool

func (*StreamPool) Authenticate

func (sp *StreamPool) Authenticate(stream *Stream) error

Authenticate it's used for identity authentication

func (*StreamPool) DelStream

func (sp *StreamPool) DelStream(stream *Stream) error

DelStream delete a stream

func (*StreamPool) FindStream

func (sp *StreamPool) FindStream(peer peer.ID) (*Stream, error)

FindStream get the stream between given peer ID

func (*StreamPool) SendMessage

func (sp *StreamPool) SendMessage(ctx context.Context, msg *p2pPb.XuperMessage, peers []peer.ID) error

SendMessage send message to given peer ID

func (*StreamPool) SendMessageWithResponse

func (sp *StreamPool) SendMessageWithResponse(ctx context.Context, msg *p2pPb.XuperMessage, peers []peer.ID, percentage float32) ([]*p2pPb.XuperMessage, error)

SendMessageWithResponse will send message to peers with response withBreak means whether request wait for all response

func (*StreamPool) Start

func (sp *StreamPool) Start()

Start start the stream pool

func (*StreamPool) Stop

func (sp *StreamPool) Stop()

Stop will stop all streams

type Subscriber

type Subscriber struct {
	// contains filtered or unexported fields
}

Subscriber define the subscriber of message

func NewSubscriber

func NewSubscriber(msgCh chan *xuperp2p.XuperMessage, msgType xuperp2p.XuperMessage_MessageType, handler xuperHandler, msgFrom string) *Subscriber

NewSubscriber create instance of Subscriber

type XchainAddrInfo

type XchainAddrInfo struct {
	Addr   string
	Pubkey []byte
	Prikey []byte
	PeerID string
}

XchainAddrInfo my xchain addr info

Directories

Path Synopsis

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL