Documentation ¶
Overview ¶
sync
Index ¶
- Constants
- Variables
- func FormatSyncBinKey(bin uint8) string
- func ParseSyncBinKey(s string) (uint8, error)
- func RegisterSwarmSyncerClient(streamer *Registry, store storage.SyncChunkStore)
- func RegisterSwarmSyncerServer(streamer *Registry, syncChunkStore storage.SyncChunkStore)
- type API
- type ChunkDeliveryMsg
- type ChunkDeliveryMsgRetrieval
- type ChunkDeliveryMsgSyncing
- type Client
- type Delivery
- func (d *Delivery) AttachBzz(bzz *network.Bzz)
- func (d *Delivery) GetConnectedNodes() (int, int)
- func (d *Delivery) GetDataFromCentral(ctx context.Context, address storage.Address)
- func (d *Delivery) GetReceiptsLogs() []state.Receipts
- func (d *Delivery) GetReceivedChunkInfo() map[common.Address]int64
- func (d *Delivery) IncreaseAccount(account common.Address)
- func (d *Delivery) RequestFromPeers(ctx context.Context, req *network.Request) (*enode.ID, chan struct{}, error)
- func (d *Delivery) SetSyncBandlimit(syncBandLimit int)
- func (d *Delivery) SyncEnabled() bool
- func (d *Delivery) UpdateNodes(nodes []string)
- type Handover
- type HandoverProof
- type OfferedHashesMsg
- type Peer
- func (p *Peer) Deliver(ctx context.Context, chunk storage.Chunk, priority uint8, syncing bool) error
- func (p *Peer) EndRetrieve(address storage.Address)
- func (p *Peer) GetDelay() time.Duration
- func (p *Peer) HandleMsg(ctx context.Context, msg interface{}) error
- func (p *Peer) SendOfferedHashes(s *server, f, t uint64) error
- func (p *Peer) SendPriority(ctx context.Context, msg interface{}, priority uint8) error
- func (p *Peer) StartRetrieve(address storage.Address)
- type QuitMsg
- type Range
- type ReceiptsMsg
- type Registry
- func (r *Registry) APIs() []rpc.API
- func (r *Registry) Close() error
- func (r *Registry) GetClientFunc(stream string) (func(*Peer, string, bool) (Client, error), error)
- func (r *Registry) GetServerFunc(stream string) (func(*Peer, string, bool) (Server, error), error)
- func (r *Registry) GetSpec() *protocols.Spec
- func (r *Registry) Protocols() []p2p.Protocol
- func (r *Registry) Quit(peerId enode.ID, s Stream) error
- func (r *Registry) RegisterClientFunc(stream string, f func(*Peer, string, bool) (Client, error))
- func (r *Registry) RegisterServerFunc(stream string, f func(*Peer, string, bool) (Server, error))
- func (r *Registry) RequestSubscription(peerId enode.ID, s Stream, h *Range, prio uint8) error
- func (r *Registry) Run(p *network.BzzPeer) error
- func (r *Registry) Start(server *p2p.Server) error
- func (r *Registry) Stop() error
- func (r *Registry) Subscribe(peerId enode.ID, s Stream, h *Range, priority uint8) error
- func (r *Registry) Unsubscribe(peerId enode.ID, s Stream) error
- type RegistryOptions
- type RequestSubscriptionMsg
- type RetrieveRequestMsg
- type Server
- type Stream
- type StreamerPrices
- type SubscribeErrorMsg
- type SubscribeMsg
- type SwarmChunkServer
- func (s *SwarmChunkServer) Close()
- func (s *SwarmChunkServer) GetData(ctx context.Context, key []byte) ([]byte, error)
- func (s *SwarmChunkServer) SessionIndex() (uint64, error)
- func (s *SwarmChunkServer) SetNextBatch(_, _ uint64) (hashes []byte, from uint64, to uint64, proof *HandoverProof, err error)
- type SwarmSyncerClient
- type SwarmSyncerServer
- type Takeover
- type TakeoverProof
- type TakeoverProofMsg
- type TrafficLoad
- type UnsubscribeMsg
- type WantedHashesMsg
- type WrappedPriorityMsg
Constants ¶
const ( Low uint8 = iota Mid High Top PriorityQueue = 4 // number of priority queues - Low, Mid, High, Top PriorityQueueCap = 512 // queue capacity HashSize = 32 )
const (
BatchSize = 128
)
const (
MAX_DELAY_CNT = 10
)
Variables ¶
var ErrMaxPeerServers = errors.New("max peer servers")
ErrMaxPeerServers will be returned if peer server limit is reached. It will be sent in the SubscribeErrorMsg.
var (
ErrorLightNodeRejectRetrieval = errors.New("Light nodes reject retrieval")
)
Functions ¶
func FormatSyncBinKey ¶
FormatSyncBinKey returns a string representation of Kademlia bin number to be used as key for SYNC stream.
func ParseSyncBinKey ¶
ParseSyncBinKey parses the string representation and returns the Kademlia bin number.
func RegisterSwarmSyncerClient ¶
func RegisterSwarmSyncerClient(streamer *Registry, store storage.SyncChunkStore)
RegisterSwarmSyncerClient registers the client constructor function for to handle incoming sync streams
func RegisterSwarmSyncerServer ¶
func RegisterSwarmSyncerServer(streamer *Registry, syncChunkStore storage.SyncChunkStore)
Types ¶
type API ¶
type API struct {
// contains filtered or unexported fields
}
func (*API) GetPeerSubscriptions ¶
GetPeerSubscriptions is a API function which allows to query a peer for stream subscriptions it has. It can be called via RPC. It returns a map of node IDs with an array of string representations of Stream objects.
func (*API) SubscribeStream ¶
type ChunkDeliveryMsg ¶
type ChunkDeliveryMsg struct { Addr storage.Address SData []byte // the stored chunk Data (incl size) // contains filtered or unexported fields }
Chunk delivery always uses the same message type....
type ChunkDeliveryMsgRetrieval ¶
type ChunkDeliveryMsgRetrieval ChunkDeliveryMsg
defines a chunk delivery for retrieval (with accounting)
type ChunkDeliveryMsgSyncing ¶
type ChunkDeliveryMsgSyncing ChunkDeliveryMsg
defines a chunk delivery for syncing (without accounting)
type Client ¶
type Client interface { NeedData(context.Context, []byte) func(context.Context) error BatchDone(Stream, uint64, []byte, []byte) func() (*TakeoverProof, error) Close() }
Client interface for incoming peer Streamer
type Delivery ¶
type Delivery struct {
// contains filtered or unexported fields
}
func NewDelivery ¶
func NewDelivery(kad *network.Kademlia, chunkStore storage.SyncChunkStore, receiptStore *state.ReceiptStore) *Delivery
func (*Delivery) GetConnectedNodes ¶
func (*Delivery) GetDataFromCentral ¶
* not used, read one chunk from center is a very inefficient routine We will research if read from another data-distribute network is very viable
func (*Delivery) GetReceiptsLogs ¶
func (*Delivery) GetReceivedChunkInfo ¶
func (*Delivery) IncreaseAccount ¶
func (*Delivery) RequestFromPeers ¶
func (d *Delivery) RequestFromPeers(ctx context.Context, req *network.Request) (*enode.ID, chan struct{}, error)
RequestFromPeers sends a chunk retrieve request to 发送一个chunk读取请求,ctx保存超时之类的信息,req是一个请求指令,包含了源地址和数据哈希,
func (*Delivery) SyncEnabled ¶
func (*Delivery) UpdateNodes ¶
type Handover ¶
type Handover struct { Stream Stream // name of stream Start, End uint64 // index of hashes Root []byte // Root hash for indexed segment inclusion proofs }
Handover represents a statement that the upstream peer hands over the stream section
type HandoverProof ¶
HandoverProof represents a signed statement that the upstream peer handed over the stream section
type OfferedHashesMsg ¶
type OfferedHashesMsg struct { Stream Stream // name of Stream From, To uint64 // peer and db-specific entry count Hashes []byte // stream of hashes (128) // Delayed uint64 *HandoverProof // HandoverProof }
OfferedHashesMsg is the protocol msg for offering to hand over a stream section
func (OfferedHashesMsg) String ¶
func (m OfferedHashesMsg) String() string
String pretty prints OfferedHashesMsg
type Peer ¶
Peer is the Peer extension for the streaming protocol
func (*Peer) Deliver ¶
func (p *Peer) Deliver(ctx context.Context, chunk storage.Chunk, priority uint8, syncing bool) error
Deliver sends a storeRequestMsg protocol message to the peer Depending on the `syncing` parameter we send different message types
func (*Peer) EndRetrieve ¶
func (*Peer) SendOfferedHashes ¶
SendOfferedHashes sends OfferedHashesMsg protocol msg
func (*Peer) SendPriority ¶
SendPriority sends message to the peer using the outgoing priority queue
func (*Peer) StartRetrieve ¶
type ReceiptsMsg ¶
收据消息,客户端从服务端收到检索的回应数据后,通过此格式向服务端发送签名 如果客户端总是向服务端提交一个新签名,即STime为新的,AMount为0,那么服务端就可以断开该客户端的连接,并将该节点加入黑名单 签名的收据总是用最低优先级发送,并且如果有新的可覆盖签名出现时,使用新的签名,可以直接丢弃老的签名
type Registry ¶
type Registry struct { NodeType uint8 // contains filtered or unexported fields }
Registry registry for outgoing and incoming streamer constructors
func NewRegistry ¶
func NewRegistry(localID enode.ID, delivery *Delivery, syncChunkStore storage.SyncChunkStore, intervalsStore state.Store, options *RegistryOptions, balance protocols.Balance) *Registry
NewRegistry is Streamer constructor
func (*Registry) GetClientFunc ¶
GetClient accessor for incoming streamer constructors
func (*Registry) GetServerFunc ¶
GetServer accessor for incoming streamer constructors
func (*Registry) GetSpec ¶
GetSpec returns the streamer spec to callers This used to be a global variable but for simulations with multiple nodes its fields (notably the Hook) would be overwritten
func (*Registry) Quit ¶
Quit sends the QuitMsg to the peer to remove the stream peer client and terminate the streaming.
func (*Registry) RegisterClientFunc ¶
RegisterClient registers an incoming streamer constructor
func (*Registry) RegisterServerFunc ¶
RegisterServer registers an outgoing streamer constructor
func (*Registry) RequestSubscription ¶
注册一个订阅,方法是向peer发送需要一个订阅消息 从这个角度来说说,每次peer建立后,都应该调用这个函数
type RegistryOptions ¶
type RegistryOptions struct { SkipCheck bool NodeType uint8 SyncUpdateDelay time.Duration MaxPeerServers int // The limit of servers for each peer in registry }
RegistryOptions holds optional values for NewRegistry constructor.
type RequestSubscriptionMsg ¶
type RequestSubscriptionMsg struct { Stream Stream History *Range `rlp:"nil"` Priority uint8 // delivered on priority channel }
RequestSubscriptionMsg is the protocol msg for a node to request subscription to a specific stream
type RetrieveRequestMsg ¶
type RetrieveRequestMsg struct { Addr storage.Address SkipCheck bool RetrieveNow bool HopCount uint8 }
RetrieveRequestMsg is the protocol msg for chunk retrieve requests
type Server ¶
type Server interface { // SessionIndex is called when a server is initialized // to get the current cursor state of the stream data. // Based on this index, live and history stream intervals // will be adjusted before calling SetNextBatch. SessionIndex() (uint64, error) SetNextBatch(uint64, uint64) (hashes []byte, from uint64, to uint64, proof *HandoverProof, err error) GetData(context.Context, []byte) ([]byte, error) Close() }
Server interface for outgoing peer Streamer
type Stream ¶
type Stream struct { // Name is used for Client and Server functions identification. Name string // Key is the name of specific stream data. Key string // Live defines whether the stream delivers only new data // for the specific stream. Live bool }
Stream defines a unique stream identifier.
type StreamerPrices ¶
type StreamerPrices struct {
// contains filtered or unexported fields
}
An accountable message needs some meta information attached to it in order to evaluate the correct price
func (*StreamerPrices) Price ¶
func (sp *StreamerPrices) Price(msg interface{}) *protocols.Price
Price implements the accounting interface and returns the price for a specific message
type SubscribeErrorMsg ¶
type SubscribeErrorMsg struct {
Error string
}
type SubscribeMsg ¶
type SubscribeMsg struct { Stream Stream History *Range `rlp:"nil"` Priority uint8 // delivered on priority channel }
SubcribeMsg is the protocol msg for requesting a stream(section)
type SwarmChunkServer ¶
type SwarmChunkServer struct {
// contains filtered or unexported fields
}
SwarmChunkServer implements Server
func NewSwarmChunkServer ¶
func NewSwarmChunkServer(chunkStore storage.ChunkStore) *SwarmChunkServer
NewSwarmChunkServer is SwarmChunkServer constructor
func (*SwarmChunkServer) Close ¶
func (s *SwarmChunkServer) Close()
Close needs to be called on a stream server
func (*SwarmChunkServer) SessionIndex ¶
func (s *SwarmChunkServer) SessionIndex() (uint64, error)
SessionIndex returns zero in all cases for SwarmChunkServer.
func (*SwarmChunkServer) SetNextBatch ¶
func (s *SwarmChunkServer) SetNextBatch(_, _ uint64) (hashes []byte, from uint64, to uint64, proof *HandoverProof, err error)
SetNextBatch
type SwarmSyncerClient ¶
type SwarmSyncerClient struct {
// contains filtered or unexported fields
}
SwarmSyncerClient
func NewSwarmSyncerClient ¶
func NewSwarmSyncerClient(p *Peer, store storage.SyncChunkStore, stream Stream) (*SwarmSyncerClient, error)
NewSwarmSyncerClient is a contructor for provable data exchange syncer
func (*SwarmSyncerClient) BatchDone ¶
func (s *SwarmSyncerClient) BatchDone(stream Stream, from uint64, hashes []byte, root []byte) func() (*TakeoverProof, error)
BatchDone
func (*SwarmSyncerClient) Close ¶
func (s *SwarmSyncerClient) Close()
type SwarmSyncerServer ¶
type SwarmSyncerServer struct {
// contains filtered or unexported fields
}
SwarmSyncerServer implements an Server for history syncing on bins offered streams: * live request delivery with or without checkback * (live/non-live historical) chunk syncing per proximity bin
func NewSwarmSyncerServer ¶
func NewSwarmSyncerServer(po uint8, syncChunkStore storage.SyncChunkStore) (*SwarmSyncerServer, error)
NewSwarmSyncerServer is constructor for SwarmSyncerServer
func (*SwarmSyncerServer) Close ¶
func (s *SwarmSyncerServer) Close()
Close needs to be called on a stream server
func (*SwarmSyncerServer) SessionIndex ¶
func (s *SwarmSyncerServer) SessionIndex() (uint64, error)
SessionIndex returns current storage bin (po) index.
func (*SwarmSyncerServer) SetNextBatch ¶
func (s *SwarmSyncerServer) SetNextBatch(from, to uint64) ([]byte, uint64, uint64, *HandoverProof, error)
GetBatch retrieves the next batch of hashes from the dbstore
type Takeover ¶
type Takeover Handover
Takeover represents a statement that downstream peer took over (stored all data) handed over
type TakeoverProof ¶
TakeoverProof represents a signed statement that the downstream peer took over
the stream section
type TakeoverProofMsg ¶
type TakeoverProofMsg TakeoverProof
TakeoverProofMsg is the protocol msg sent by downstream peer
func (TakeoverProofMsg) String ¶
func (m TakeoverProofMsg) String() string
String pretty prints TakeoverProofMsg
type TrafficLoad ¶
type TrafficLoad struct {
// contains filtered or unexported fields
}
type UnsubscribeMsg ¶
type UnsubscribeMsg struct {
Stream Stream
}
type WantedHashesMsg ¶
type WantedHashesMsg struct { Stream Stream Want []byte // bitvector indicating which keys of the batch needed 当前想要的哈希的位映射表 From, To uint64 // next interval offset - empty if not to be continued 下一个要检查的区域 }
WantedHashesMsg is the protocol msg data for signaling which hashes offered in OfferedHashesMsg downstream peer actually wants sent over
func (WantedHashesMsg) String ¶
func (m WantedHashesMsg) String() string
String pretty prints WantedHashesMsg