p2pv2

package
v0.0.0-...-d9e9996 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Oct 27, 2019 License: Apache-2.0 Imports: 39 Imported by: 0

Documentation

Overview

Package p2pv2 is the v2 of AmpChain p2p network.

Index

Constants

View Source
const (
	DefaultStrategy           FilterStrategy = "DefaultStrategy"
	BucketsStrategy                          = "BucketsStrategy"
	NearestBucketStrategy                    = "NearestBucketStrategy"
	BucketsWithFactorStrategy                = "BucketsWithFactorStrategy"
	CorePeersStrategy                        = "CorePeersStrategy"
)

supported filter strategies

View Source
const (
	MsgChanSize         = 50000
	MsgHandledCacheSize = 50000
)

define default message config

View Source
const (
	AmperProtocolID       = "/amper/2.0.0" // protocol version
	MaxBroadCastPeers     = 20             // the maximum peers to broadcast messages
	MaxBroadCastCorePeers = 10             // the maximum core peers to broadcast messages
)

define the common config

Variables

View Source
var (
	ErrSubscribe       = errors.New("subscribe error")
	ErrAlreadyRegisted = errors.New("subscriber already registered")
	ErrUnregister      = errors.New("unregister subscriber error")
)

define errors

View Source
var (
	ErrGenerateOpts     = errors.New("generate host opts error")
	ErrCreateHost       = errors.New("create host error")
	ErrCreateKadDht     = errors.New("create kad dht error")
	ErrCreateStreamPool = errors.New("create stream pool error")
	ErrCreateBootStrap  = errors.New("create bootstrap error pool error")
	ErrConnectBootStrap = errors.New("error to connect to all bootstrap")
	ErrConnectCorePeers = errors.New("error to connect to all core peers")
	ErrInvalidParams    = errors.New("invalid params")
)

define errors

View Source
var (
	ErrValidateConfig   = errors.New("config not valid")
	ErrCreateNode       = errors.New("create node error")
	ErrCreateHandlerMap = errors.New("create handlerMap error")
)

define errors

View Source
var (
	ErrTimeout     = errors.New("request time out")
	ErrNullResult  = errors.New("request result is null")
	ErrStrNotValid = errors.New("stream not valid")
)

define common errors

View Source
var (
	ErrStreamNotFound = errors.New("stream not found")
	ErrStreamPoolFull = errors.New("stream pool is full")
	ErrAddStream      = errors.New("error to add stream")
	ErrRequest        = errors.New("error request from network")
	ErrAuth           = errors.New("error invalid auth request")
)

define common errors

Functions

func GenerateKeyPairWithPath

func GenerateKeyPairWithPath(path string) error

GenerateKeyPairWithPath generate amper net key pair

func GenerateUniqueRandList

func GenerateUniqueRandList(size int, max int) []int

GenerateUniqueRandList get a random unique number list

func GetAuthRequest

func GetAuthRequest(v *AChainAddrInfo) (*pb.IdentityAuth, error)

GetAuthRequest get auth request for authentication

func GetKeyPairFromPath

func GetKeyPairFromPath(path string) (crypto.PrivKey, error)

GetKeyPairFromPath get amper net key from file path

func GetPeerIDFromPath

func GetPeerIDFromPath(keypath string) (string, error)

GetPeerIDFromPath return peer id of given private key path

Types

type AChainAddrInfo

type AChainAddrInfo struct {
	Addr   string
	Pubkey []byte
	Prikey []byte
	PeerID string
}

AChainAddrInfo my AmpChain addr info

type BucketsFilter

type BucketsFilter struct {
	// contains filtered or unexported fields
}

BucketsFilter define filter that get all peers in buckets

func (*BucketsFilter) Filter

func (bf *BucketsFilter) Filter() ([]peer.ID, error)

Filter 依据Bucket分层广播

type BucketsFilterWithFactor

type BucketsFilterWithFactor struct {
	// contains filtered or unexported fields
}

BucketsFilterWithFactor define filter that get a certain percentage peers in each bucket

func (*BucketsFilterWithFactor) Filter

func (nf *BucketsFilterWithFactor) Filter() ([]peer.ID, error)

Filter 从每个Bucket中挑选占比Factor个peers进行广播 对于每一个Bucket,平均分成若干块,每个块抽取若干个节点

*|<---------------- Bucket ---------------->|
*--------------------------------------------
*|        |        |        |        |      |
*--------------------------------------------
*       split1   split2    split3   split4 split5

type CorePeersFilter

type CorePeersFilter struct {
	// contains filtered or unexported fields
}

CorePeersFilter define filter for core peers

func (*CorePeersFilter) Filter

func (cp *CorePeersFilter) Filter() ([]peer.ID, error)

Filter select MaxBroadCastCorePeers random peers from core peers, half from current and half from next

func (*CorePeersFilter) SetRouteName

func (cp *CorePeersFilter) SetRouteName(name string)

SetRouteName set the core route name to filter in AmpChain, the route name is the blockchain name

type CorePeersInfo

type CorePeersInfo struct {
	Name           string   // distinguished name of the node routing
	CurrentTermNum int64    // the current term number
	CurrentPeerIDs []string // current core peer IDs
	NextPeerIDs    []string // upcoming core peer IDs
}

CorePeersInfo defines the peers' info for core nodes By setting this info, we can keep some core peers always connected directly It's useful for keeping DPoS key network security and for some BFT-like consensus

type FilterStrategy

type FilterStrategy string

FilterStrategy defines the supported filter strategies

type HandlerMap

type HandlerMap struct {
	// contains filtered or unexported fields
}

HandlerMap the message handler manager keeps the message and handler mapping and recently handled messages

func NewHandlerMap

func NewHandlerMap(log log.Logger) (*HandlerMap, error)

NewHandlerMap create instance of HandlerMap

func (*HandlerMap) HandleMessage

func (hm *HandlerMap) HandleMessage(s *Stream, msg *amperp2p.AmperMessage) error

HandleMessage handle new messages with registered handlers

func (*HandlerMap) IsMsgAsHandled

func (hm *HandlerMap) IsMsgAsHandled(msg *amperp2p.AmperMessage) bool

IsMsgAsHandled used to check whether the msg has been dealt with.

func (*HandlerMap) MarkMsgAsHandled

func (hm *HandlerMap) MarkMsgAsHandled(msg *amperp2p.AmperMessage)

MarkMsgAsHandled used to mark message has been dealt with.

func (*HandlerMap) Register

func (hm *HandlerMap) Register(sub *Subscriber) (*Subscriber, error)

Register used to register subscriber to handlerMap.

func (*HandlerMap) Start

func (hm *HandlerMap) Start()

Start start message handling

func (*HandlerMap) Stop

func (hm *HandlerMap) Stop()

Stop stop message handling

func (*HandlerMap) UnRegister

func (hm *HandlerMap) UnRegister(sub *Subscriber) error

UnRegister used to un register subscriber from handlerMap.

type MessageOption

type MessageOption func(*msgOptions)

MessageOption define single option function

func WithBcName

func WithBcName(bcname string) MessageOption

WithBcName add bcname to message option

func WithFilters

func WithFilters(filter []FilterStrategy) MessageOption

WithFilters add filter strategies to message option

func WithPercentage

func WithPercentage(percentage float32) MessageOption

WithPercentage add percentage to message option

func WithTargetPeerAddrs

func WithTargetPeerAddrs(peerAddrs []string) MessageOption

WithTargetPeerAddrs add target peer addresses to message option

type MultiStrategy

type MultiStrategy struct {
	// contains filtered or unexported fields
}

MultiStrategy a peer filter that contains multiple filters

func NewMultiStrategy

func NewMultiStrategy(node *Node, filters []PeersFilter) *MultiStrategy

NewMultiStrategy create instance of MultiStrategy

func (*MultiStrategy) Filter

func (cp *MultiStrategy) Filter() ([]peer.ID, error)

Filter return peer IDs with multiple filters

type MultiSubscriber

type MultiSubscriber struct {
	// contains filtered or unexported fields
}

MultiSubscriber wrap a list of Subscriber of same message

type NearestBucketFilter

type NearestBucketFilter struct {
	// contains filtered or unexported fields
}

NearestBucketFilter define filter that get nearest peers from a specified peer ID

func (*NearestBucketFilter) Filter

func (nf *NearestBucketFilter) Filter() ([]peer.ID, error)

Filter 广播给最近的Bucket

type Node

type Node struct {
	// contains filtered or unexported fields
}

Node is the node in the network

func NewNode

func NewNode(cfg config.P2PConfig, log log.Logger) (*Node, error)

NewNode define the node of the amper, it will set streamHandler for this node.

func (*Node) Context

func (no *Node) Context() context.Context

Context return the node context

func (*Node) ListPeers

func (no *Node) ListPeers() []peer.ID

ListPeers return the list of peer ID in routing table

func (*Node) NodeID

func (no *Node) NodeID() peer.ID

NodeID return the node ID

func (*Node) SendMessage

func (no *Node) SendMessage(ctx context.Context, msg *p2pPb.AmperMessage, peers []peer.ID) error

SendMessage send message to given peers

func (*Node) SendMessageWithResponse

func (no *Node) SendMessageWithResponse(ctx context.Context, msg *p2pPb.AmperMessage, peers []peer.ID, percentage float32) ([]*p2pPb.AmperMessage, error)

SendMessageWithResponse send message to given peers, expecting response from peers

func (*Node) SetServer

func (no *Node) SetServer(srv *P2PServerV2)

SetServer set the p2p server of the node

func (*Node) Start

func (no *Node) Start()

Start start the node

func (*Node) Stop

func (no *Node) Stop()

Stop stop the node

func (*Node) UpdateCorePeers

func (no *Node) UpdateCorePeers(cp *CorePeersInfo) error

UpdateCorePeers update core peers' info and keep connection to core peers

type P2PServer

type P2PServer interface {
	Start()
	Stop()

	// 注册订阅者,支持多个用户订阅同一类消息
	Register(sub *Subscriber) (*Subscriber, error)
	// 注销订阅者,需要根据当时注册时返回的Subscriber实例删除
	UnRegister(sub *Subscriber) error

	SendMessage(context.Context, *p2pPb.AmperMessage, ...MessageOption) error
	// todo: 将请求的参数改为Option的方式
	SendMessageWithResponse(context.Context, *p2pPb.AmperMessage, ...MessageOption) ([]*p2pPb.AmperMessage, error)

	GetNetURL() string
	// 查询所连接节点的信息
	GetPeerUrls() []string

	// SetCorePeers set core peers' info to P2P server
	SetCorePeers(cp *CorePeersInfo) error

	// SetAChainAddr Set AmpChain address from AmpChaincore
	SetAChainAddr(bcname string, info *AChainAddrInfo)
}

P2PServer is the p2p server interface of Amper

type P2PServerV2

type P2PServerV2 struct {
	// contains filtered or unexported fields
}

P2PServerV2 is the v2 of AmpChain p2p server. An implement of P2PServer interface.

func NewP2PServerV2

func NewP2PServerV2(cfg config.P2PConfig, lg log.Logger) (*P2PServerV2, error)

NewP2PServerV2 create P2PServerV2 instance

func (*P2PServerV2) GetNetURL

func (p *P2PServerV2) GetNetURL() string

GetNetURL return net url of the amper node url = /ip4/127.0.0.1/tcp/<port>/p2p/<peer.Id>

func (*P2PServerV2) GetPeerUrls

func (p *P2PServerV2) GetPeerUrls() []string

GetPeerUrls 查询所连接节点的信息

func (*P2PServerV2) Register

func (p *P2PServerV2) Register(sub *Subscriber) (*Subscriber, error)

Register register message subscribers to handle messages

func (*P2PServerV2) SendMessage

func (p *P2PServerV2) SendMessage(ctx context.Context, msg *p2pPb.AmperMessage,
	opts ...MessageOption) error

SendMessage send message to peers using given filter strategy

func (*P2PServerV2) SendMessageWithResponse

func (p *P2PServerV2) SendMessageWithResponse(ctx context.Context, msg *p2pPb.AmperMessage,
	opts ...MessageOption) ([]*p2pPb.AmperMessage, error)

SendMessageWithResponse send message to peers using given filter strategy, expect response from peers 客户端再使用该方法请求带返回的消息时,最好带上log_id, 否则会导致收消息时收到不匹配的消息而影响后续的处理

func (*P2PServerV2) SetAChainAddr

func (p *P2PServerV2) SetAChainAddr(bcname string, info *AChainAddrInfo)

SetAChainAddr Set AmpChain address info from core

func (*P2PServerV2) SetCorePeers

func (p *P2PServerV2) SetCorePeers(cp *CorePeersInfo) error

SetCorePeers set core peers' info to P2P server

func (*P2PServerV2) Start

func (p *P2PServerV2) Start()

Start start P2P server V2

func (*P2PServerV2) Stop

func (p *P2PServerV2) Stop()

Stop stop P2P server V2

func (*P2PServerV2) UnRegister

func (p *P2PServerV2) UnRegister(sub *Subscriber) error

UnRegister remove message subscribers

type PeersFilter

type PeersFilter interface {
	Filter() ([]peer.ID, error)
}

PeersFilter the interface for filter peers

type Stream

type Stream struct {
	// contains filtered or unexported fields
}

Stream is the IO wrapper for underly P2P connection

func NewStream

func NewStream(s net.Stream, no *Node) *Stream

NewStream create Stream instance

func (*Stream) Authenticate

func (s *Stream) Authenticate() error

Authenticate it's used for identity authentication

func (*Stream) Close

func (s *Stream) Close()

Close close the connected IO stream

func (*Stream) PeerID

func (s *Stream) PeerID() string

PeerID get peerID

func (*Stream) SendMessage

func (s *Stream) SendMessage(ctx context.Context, msg *p2pPb.AmperMessage) error

SendMessage will send a message to a peer

func (*Stream) SendMessageWithResponse

func (s *Stream) SendMessageWithResponse(ctx context.Context, msg *p2pPb.AmperMessage) (*p2pPb.AmperMessage, error)

SendMessageWithResponse will send a message to a peer and wait for response

func (*Stream) Start

func (s *Stream) Start()

Start used to start

type StreamLimit

type StreamLimit struct {
	// contains filtered or unexported fields
}

StreamLimit limit the peerID amount of same ip

func (*StreamLimit) AddStream

func (sl *StreamLimit) AddStream(addrStr string, peerID peer.ID) bool

AddStream used to add the amount of same ip, plus one per call

func (*StreamLimit) DelStream

func (sl *StreamLimit) DelStream(addrStr string)

DelStream used to dec the amount of same ip, dec one per call

func (*StreamLimit) Init

func (sl *StreamLimit) Init(limit int64, lg log.Logger)

Init initialize the StreamLimit

type StreamPool

type StreamPool struct {
	// contains filtered or unexported fields
}

StreamPool manage all the stream

func NewStreamPool

func NewStreamPool(maxStreamLimit int32, no *Node, log log.Logger) (*StreamPool, error)

NewStreamPool create StreamPool instance

func (*StreamPool) Add

func (sp *StreamPool) Add(s net.Stream) *Stream

Add used to add a new net stream into pool

func (*StreamPool) AddStream

func (sp *StreamPool) AddStream(stream *Stream) error

AddStream used to add a new P2P stream into pool

func (*StreamPool) Authenticate

func (sp *StreamPool) Authenticate(stream *Stream) error

Authenticate it's used for identity authentication

func (*StreamPool) DelStream

func (sp *StreamPool) DelStream(stream *Stream) error

DelStream delete a stream

func (*StreamPool) FindStream

func (sp *StreamPool) FindStream(peer peer.ID) (*Stream, error)

FindStream get the stream between given peer ID

func (*StreamPool) SendMessage

func (sp *StreamPool) SendMessage(ctx context.Context, msg *p2pPb.AmperMessage, peers []peer.ID) error

SendMessage send message to given peer ID

func (*StreamPool) SendMessageWithResponse

func (sp *StreamPool) SendMessageWithResponse(ctx context.Context, msg *p2pPb.AmperMessage, peers []peer.ID, percentage float32) ([]*p2pPb.AmperMessage, error)

SendMessageWithResponse will send message to peers with response withBreak means whether request wait for all response

func (*StreamPool) Start

func (sp *StreamPool) Start()

Start start the stream pool

func (*StreamPool) Stop

func (sp *StreamPool) Stop()

Stop will stop all streams

type Subscriber

type Subscriber struct {
	// contains filtered or unexported fields
}

Subscriber define the subscriber of message

func NewSubscriber

func NewSubscriber(msgCh chan *amperp2p.AmperMessage, msgType amperp2p.AmperMessage_MessageType, handler amperHandler, msgFrom string) *Subscriber

NewSubscriber create instance of Subscriber

Directories

Path Synopsis

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL