stream

package
v0.4.2 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jun 28, 2019 License: GPL-3.0 Imports: 26 Imported by: 0

Documentation

Index

Constants

View Source
const (
	Low uint8 = iota
	Mid
	High
	Top
	PriorityQueue    = 4    // number of priority queues - Low, Mid, High, Top
	PriorityQueueCap = 4096 // queue capacity
	HashSize         = 32
)
View Source
const (
	BatchSize = 128
)

Variables

View Source
var ErrMaxPeerServers = errors.New("max peer servers")

ErrMaxPeerServers will be returned if peer server limit is reached. It will be sent in the SubscribeErrorMsg.

Functions

func FormatSyncBinKey

func FormatSyncBinKey(bin uint8) string

FormatSyncBinKey returns a string representation of Kademlia bin number to be used as key for SYNC stream.

func ParseSyncBinKey

func ParseSyncBinKey(s string) (uint8, error)

ParseSyncBinKey parses the string representation and returns the Kademlia bin number.

func RegisterSwarmSyncerClient

func RegisterSwarmSyncerClient(streamer *Registry, netStore *storage.NetStore)

RegisterSwarmSyncerClient registers the client constructor function for to handle incoming sync streams

func RegisterSwarmSyncerServer

func RegisterSwarmSyncerServer(streamer *Registry, netStore *storage.NetStore)

Types

type API

type API struct {
	// contains filtered or unexported fields
}

func NewAPI

func NewAPI(r *Registry) *API

func (*API) GetPeerServerSubscriptions

func (api *API) GetPeerServerSubscriptions() map[string][]string

GetPeerServerSubscriptions is a API function which allows to query a peer for stream subscriptions it has. It can be called via RPC. It returns a map of node IDs with an array of string representations of Stream objects.

func (*API) SubscribeStream

func (api *API) SubscribeStream(peerId enode.ID, s Stream, history *Range, priority uint8) error

func (*API) UnsubscribeStream

func (api *API) UnsubscribeStream(peerId enode.ID, s Stream) error

type ChunkDeliveryMsg

type ChunkDeliveryMsg struct {
	Addr  storage.Address
	SData []byte // the stored chunk Data (incl size)
	// contains filtered or unexported fields
}

Chunk delivery always uses the same message type....

type ChunkDeliveryMsgRetrieval

type ChunkDeliveryMsgRetrieval ChunkDeliveryMsg

defines a chunk delivery for retrieval (with accounting)

type ChunkDeliveryMsgSyncing

type ChunkDeliveryMsgSyncing ChunkDeliveryMsg

defines a chunk delivery for syncing (without accounting)

type Client

type Client interface {
	NeedData(context.Context, []byte) (bool, func(context.Context) error)
	Close()
}

Client interface for incoming peer Streamer

type Delivery

type Delivery struct {
	// contains filtered or unexported fields
}

func NewDelivery

func NewDelivery(kad *network.Kademlia, netStore *storage.NetStore) *Delivery

func (*Delivery) Close

func (d *Delivery) Close()

func (*Delivery) FindPeer added in v0.4.2

func (d *Delivery) FindPeer(ctx context.Context, req *storage.Request) (*Peer, error)

FindPeer is returning the closest peer from Kademlia that a chunk request hasn't already been sent to

func (*Delivery) RequestFromPeers

func (d *Delivery) RequestFromPeers(ctx context.Context, req *storage.Request, localID enode.ID) (*enode.ID, error)

RequestFromPeers sends a chunk retrieve request to the next found peer

type Handover

type Handover struct {
	Stream     Stream // name of stream
	Start, End uint64 // index of hashes
	Root       []byte // Root hash for indexed segment inclusion proofs
}

Handover represents a statement that the upstream peer hands over the stream section

type OfferedHashesMsg

type OfferedHashesMsg struct {
	Stream   Stream // name of Stream
	From, To uint64 // peer and db-specific entry count
	Hashes   []byte // stream of hashes (128)
}

OfferedHashesMsg is the protocol msg for offering to hand over a stream section

func (OfferedHashesMsg) String

func (m OfferedHashesMsg) String() string

String pretty prints OfferedHashesMsg

type Peer

type Peer struct {
	*network.BzzPeer
	// contains filtered or unexported fields
}

Peer is the Peer extension for the streaming protocol

func NewPeer

func NewPeer(peer *network.BzzPeer, streamer *Registry) *Peer

NewPeer is the constructor for Peer

func (*Peer) Deliver

func (p *Peer) Deliver(ctx context.Context, chunk storage.Chunk, priority uint8, syncing bool) error

Deliver sends a storeRequestMsg protocol message to the peer Depending on the `syncing` parameter we send different message types

func (*Peer) HandleMsg

func (p *Peer) HandleMsg(ctx context.Context, msg interface{}) error

HandleMsg is the message handler that delegates incoming messages

func (*Peer) SendOfferedHashes

func (p *Peer) SendOfferedHashes(s *server, f, t uint64) error

SendOfferedHashes sends OfferedHashesMsg protocol msg

func (*Peer) SendPriority

func (p *Peer) SendPriority(ctx context.Context, msg interface{}, priority uint8) error

SendPriority sends message to the peer using the outgoing priority queue

type QuitMsg

type QuitMsg struct {
	Stream Stream
}

type Range

type Range struct {
	From, To uint64
}

func NewRange

func NewRange(from, to uint64) *Range

func (*Range) String

func (r *Range) String() string

type Registry

type Registry struct {
	// contains filtered or unexported fields
}

Registry registry for outgoing and incoming streamer constructors

func NewRegistry

func NewRegistry(localID enode.ID, delivery *Delivery, netStore *storage.NetStore, intervalsStore state.Store, options *RegistryOptions, balance protocols.Balance) *Registry

NewRegistry is Streamer constructor

func (*Registry) APIs

func (r *Registry) APIs() []rpc.API

func (*Registry) Close

func (r *Registry) Close() error

func (*Registry) GetClientFunc

func (r *Registry) GetClientFunc(stream string) (func(*Peer, string, bool) (Client, error), error)

GetClient accessor for incoming streamer constructors

func (*Registry) GetServerFunc

func (r *Registry) GetServerFunc(stream string) (func(*Peer, string, bool) (Server, error), error)

GetServer accessor for incoming streamer constructors

func (*Registry) GetSpec

func (r *Registry) GetSpec() *protocols.Spec

GetSpec returns the streamer spec to callers This used to be a global variable but for simulations with multiple nodes its fields (notably the Hook) would be overwritten

func (*Registry) Protocols

func (r *Registry) Protocols() []p2p.Protocol

func (*Registry) Quit

func (r *Registry) Quit(peerId enode.ID, s Stream) error

Quit sends the QuitMsg to the peer to remove the stream peer client and terminate the streaming.

func (*Registry) RegisterClientFunc

func (r *Registry) RegisterClientFunc(stream string, f func(*Peer, string, bool) (Client, error))

RegisterClient registers an incoming streamer constructor

func (*Registry) RegisterServerFunc

func (r *Registry) RegisterServerFunc(stream string, f func(*Peer, string, bool) (Server, error))

RegisterServer registers an outgoing streamer constructor

func (*Registry) RequestSubscription

func (r *Registry) RequestSubscription(peerId enode.ID, s Stream, h *Range, prio uint8) error

func (*Registry) Run

func (r *Registry) Run(p *network.BzzPeer) error

Run protocol run function

func (*Registry) Start

func (r *Registry) Start(server *p2p.Server) error

func (*Registry) Stop

func (r *Registry) Stop() error

func (*Registry) Subscribe

func (r *Registry) Subscribe(peerId enode.ID, s Stream, h *Range, priority uint8) error

Subscribe initiates the streamer

func (*Registry) Unsubscribe

func (r *Registry) Unsubscribe(peerId enode.ID, s Stream) error

type RegistryOptions

type RegistryOptions struct {
	SkipCheck       bool
	Syncing         SyncingOption // Defines syncing behavior
	SyncUpdateDelay time.Duration
	MaxPeerServers  int // The limit of servers for each peer in registry
}

RegistryOptions holds optional values for NewRegistry constructor.

type RequestSubscriptionMsg

type RequestSubscriptionMsg struct {
	Stream   Stream
	History  *Range `rlp:"nil"`
	Priority uint8  // delivered on priority channel
}

RequestSubscriptionMsg is the protocol msg for a node to request subscription to a specific stream

type RetrieveRequestMsg

type RetrieveRequestMsg struct {
	Addr storage.Address
}

RetrieveRequestMsg is the protocol msg for chunk retrieve requests

type Server

type Server interface {
	// SessionIndex is called when a server is initialized
	// to get the current cursor state of the stream data.
	// Based on this index, live and history stream intervals
	// will be adjusted before calling SetNextBatch.
	SessionIndex() (uint64, error)
	SetNextBatch(uint64, uint64) (hashes []byte, from uint64, to uint64, err error)
	GetData(context.Context, []byte) ([]byte, error)
	Close()
}

Server interface for outgoing peer Streamer

type Stream

type Stream struct {
	// Name is used for Client and Server functions identification.
	Name string
	// Key is the name of specific stream data.
	Key string
	// Live defines whether the stream delivers only new data
	// for the specific stream.
	Live bool
}

Stream defines a unique stream identifier.

func NewStream

func NewStream(name string, key string, live bool) Stream

func (Stream) String

func (s Stream) String() string

String return a stream id based on all Stream fields.

type StreamerPrices

type StreamerPrices struct {
	// contains filtered or unexported fields
}

An accountable message needs some meta information attached to it in order to evaluate the correct price

func (*StreamerPrices) Price

func (sp *StreamerPrices) Price(msg interface{}) *protocols.Price

Price implements the accounting interface and returns the price for a specific message

type SubscribeErrorMsg

type SubscribeErrorMsg struct {
	Error string
}

type SubscribeMsg

type SubscribeMsg struct {
	Stream   Stream
	History  *Range `rlp:"nil"`
	Priority uint8  // delivered on priority channel
}

SubcribeMsg is the protocol msg for requesting a stream(section)

type SwarmSyncerClient

type SwarmSyncerClient struct {
	// contains filtered or unexported fields
}

SwarmSyncerClient

func NewSwarmSyncerClient

func NewSwarmSyncerClient(p *Peer, netStore *storage.NetStore, stream Stream) (*SwarmSyncerClient, error)

NewSwarmSyncerClient is a contructor for provable data exchange syncer

func (*SwarmSyncerClient) Close

func (s *SwarmSyncerClient) Close()

func (*SwarmSyncerClient) NeedData

func (s *SwarmSyncerClient) NeedData(ctx context.Context, key []byte) (loaded bool, wait func(context.Context) error)

type SwarmSyncerServer

type SwarmSyncerServer struct {
	// contains filtered or unexported fields
}

SwarmSyncerServer implements an Server for history syncing on bins offered streams: * live request delivery with or without checkback * (live/non-live historical) chunk syncing per proximity bin

func NewSwarmSyncerServer

func NewSwarmSyncerServer(po uint8, netStore *storage.NetStore, correlateId string) (*SwarmSyncerServer, error)

NewSwarmSyncerServer is constructor for SwarmSyncerServer

func (*SwarmSyncerServer) Close

func (s *SwarmSyncerServer) Close()

Close needs to be called on a stream server

func (*SwarmSyncerServer) GetData

func (s *SwarmSyncerServer) GetData(ctx context.Context, key []byte) ([]byte, error)

GetData retrieves the actual chunk from netstore

func (*SwarmSyncerServer) SessionIndex

func (s *SwarmSyncerServer) SessionIndex() (uint64, error)

SessionIndex returns current storage bin (po) index.

func (*SwarmSyncerServer) SetNextBatch

func (s *SwarmSyncerServer) SetNextBatch(from, to uint64) ([]byte, uint64, uint64, error)

SetNextBatch retrieves the next batch of hashes from the localstore. It expects a range of bin IDs, both ends inclusive in syncing, and returns concatenated byte slice of chunk addresses and bin IDs of the first and the last one in that slice. The batch may have up to BatchSize number of chunk addresses. If at least one chunk is added to the batch and no new chunks are added in batchTimeout period, the batch will be returned. This function will block until new chunks are received from localstore pull subscription.

type SyncingOption

type SyncingOption int

Enumerate options for syncing and retrieval

const (
	// Syncing disabled
	SyncingDisabled SyncingOption = iota
	// Register the client and the server but not subscribe
	SyncingRegisterOnly
	// Both client and server funcs are registered, subscribe sent automatically
	SyncingAutoSubscribe
)

Syncing options

type Takeover

type Takeover Handover

Takeover represents a statement that downstream peer took over (stored all data) handed over

type TakeoverProof

type TakeoverProof struct {
	Sig []byte // Sign(Hash(Serialisation(Takeover)))
	*Takeover
}
TakeoverProof represents a signed statement that the downstream peer took over

the stream section

type TakeoverProofMsg

type TakeoverProofMsg TakeoverProof

TakeoverProofMsg is the protocol msg sent by downstream peer

func (TakeoverProofMsg) String

func (m TakeoverProofMsg) String() string

String pretty prints TakeoverProofMsg

type UnsubscribeMsg

type UnsubscribeMsg struct {
	Stream Stream
}

type WantedHashesMsg

type WantedHashesMsg struct {
	Stream   Stream
	Want     []byte // bitvector indicating which keys of the batch needed
	From, To uint64 // next interval offset - empty if not to be continued
}

WantedHashesMsg is the protocol msg data for signaling which hashes offered in OfferedHashesMsg downstream peer actually wants sent over

func (WantedHashesMsg) String

func (m WantedHashesMsg) String() string

String pretty prints WantedHashesMsg

type WrappedPriorityMsg

type WrappedPriorityMsg struct {
	Context context.Context
	Msg     interface{}
}

Directories

Path Synopsis

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL