stream

package
v0.0.0-...-b78b3a4 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Mar 16, 2019 License: GPL-3.0 Imports: 24 Imported by: 0

Documentation

Index

Constants

View Source
const (
	Low uint8 = iota
	Mid
	High
	Top
	PriorityQueue         //
	PriorityQueueCap = 32 //
	HashSize         = 32
)
View Source
const (
	//
	BatchSize = 128
)

Variables

View Source
var Spec = &protocols.Spec{
	Name:       "stream",
	Version:    5,
	MaxMsgSize: 10 * 1024 * 1024,
	Messages: []interface{}{
		UnsubscribeMsg{},
		OfferedHashesMsg{},
		WantedHashesMsg{},
		TakeoverProofMsg{},
		SubscribeMsg{},
		RetrieveRequestMsg{},
		ChunkDeliveryMsg{},
		SubscribeErrorMsg{},
		RequestSubscriptionMsg{},
		QuitMsg{},
	},
}

Functions

func FormatSyncBinKey

func FormatSyncBinKey(bin uint8) string

func ParseSyncBinKey

func ParseSyncBinKey(s string) (uint8, error)

func RegisterSwarmSyncerClient

func RegisterSwarmSyncerClient(streamer *Registry, db *storage.DBAPI)

func RegisterSwarmSyncerServer

func RegisterSwarmSyncerServer(streamer *Registry, db *storage.DBAPI)

Types

type API

type API struct {
	// contains filtered or unexported fields
}

func NewAPI

func NewAPI(r *Registry) *API

func (*API) SubscribeStream

func (api *API) SubscribeStream(peerId discover.NodeID, s Stream, history *Range, priority uint8) error

func (*API) UnsubscribeStream

func (api *API) UnsubscribeStream(peerId discover.NodeID, s Stream) error

type ChunkDeliveryMsg

type ChunkDeliveryMsg struct {
	Addr  storage.Address
	SData []byte //
	// contains filtered or unexported fields
}

type Client

type Client interface {
	NeedData(context.Context, []byte) func()
	BatchDone(Stream, uint64, []byte, []byte) func() (*TakeoverProof, error)
	Close()
}

type Delivery

type Delivery struct {
	// contains filtered or unexported fields
}

func NewDelivery

func NewDelivery(overlay network.Overlay, db *storage.DBAPI) *Delivery

func (*Delivery) RequestFromPeers

func (d *Delivery) RequestFromPeers(ctx context.Context, hash []byte, skipCheck bool, peersToSkip ...discover.NodeID) error

type Handover

type Handover struct {
	Stream     Stream //
	Start, End uint64 //
	Root       []byte //
}

type HandoverProof

type HandoverProof struct {
	Sig []byte //
	*Handover
}

type OfferedHashesMsg

type OfferedHashesMsg struct {
	Stream         Stream //
	From, To       uint64 //
	Hashes         []byte //
	*HandoverProof        //
}

func (OfferedHashesMsg) String

func (m OfferedHashesMsg) String() string

type Peer

type Peer struct {
	*protocols.Peer
	// contains filtered or unexported fields
}

func NewPeer

func NewPeer(peer *protocols.Peer, streamer *Registry) *Peer

func (*Peer) Deliver

func (p *Peer) Deliver(ctx context.Context, chunk *storage.Chunk, priority uint8) error

func (*Peer) HandleMsg

func (p *Peer) HandleMsg(ctx context.Context, msg interface{}) error

func (*Peer) SendOfferedHashes

func (p *Peer) SendOfferedHashes(s *server, f, t uint64) error

func (*Peer) SendPriority

func (p *Peer) SendPriority(ctx context.Context, msg interface{}, priority uint8) error

type QuitMsg

type QuitMsg struct {
	Stream Stream
}

type Range

type Range struct {
	From, To uint64
}

func NewRange

func NewRange(from, to uint64) *Range

func (*Range) String

func (r *Range) String() string

type Registry

type Registry struct {
	// contains filtered or unexported fields
}

func NewRegistry

func NewRegistry(addr *network.BzzAddr, delivery *Delivery, db *storage.DBAPI, intervalsStore state.Store, options *RegistryOptions) *Registry

func (*Registry) APIs

func (r *Registry) APIs() []rpc.API

func (*Registry) Close

func (r *Registry) Close() error

func (*Registry) GetClientFunc

func (r *Registry) GetClientFunc(stream string) (func(*Peer, string, bool) (Client, error), error)

func (*Registry) GetServerFunc

func (r *Registry) GetServerFunc(stream string) (func(*Peer, string, bool) (Server, error), error)

func (*Registry) NodeInfo

func (r *Registry) NodeInfo() interface{}

func (*Registry) PeerInfo

func (r *Registry) PeerInfo(id discover.NodeID) interface{}

func (*Registry) Protocols

func (r *Registry) Protocols() []p2p.Protocol

func (*Registry) Quit

func (r *Registry) Quit(peerId discover.NodeID, s Stream) error

func (*Registry) RegisterClientFunc

func (r *Registry) RegisterClientFunc(stream string, f func(*Peer, string, bool) (Client, error))

func (*Registry) RegisterServerFunc

func (r *Registry) RegisterServerFunc(stream string, f func(*Peer, string, bool) (Server, error))

func (*Registry) RequestSubscription

func (r *Registry) RequestSubscription(peerId discover.NodeID, s Stream, h *Range, prio uint8) error

func (*Registry) Retrieve

func (r *Registry) Retrieve(ctx context.Context, chunk *storage.Chunk) error

func (*Registry) Run

func (r *Registry) Run(p *network.BzzPeer) error

func (*Registry) Start

func (r *Registry) Start(server *p2p.Server) error

func (*Registry) Stop

func (r *Registry) Stop() error

func (*Registry) Subscribe

func (r *Registry) Subscribe(peerId discover.NodeID, s Stream, h *Range, priority uint8) error

func (*Registry) Unsubscribe

func (r *Registry) Unsubscribe(peerId discover.NodeID, s Stream) error

type RegistryOptions

type RegistryOptions struct {
	SkipCheck       bool
	DoSync          bool
	DoRetrieve      bool
	SyncUpdateDelay time.Duration
}

type RequestSubscriptionMsg

type RequestSubscriptionMsg struct {
	Stream   Stream
	History  *Range `rlp:"nil"`
	Priority uint8  //
}

type RetrieveRequestMsg

type RetrieveRequestMsg struct {
	Addr      storage.Address
	SkipCheck bool
}

type Server

type Server interface {
	SetNextBatch(uint64, uint64) (hashes []byte, from uint64, to uint64, proof *HandoverProof, err error)
	GetData(context.Context, []byte) ([]byte, error)
	Close()
}

type Stream

type Stream struct {
	//
	Name string
	//
	Key string
	//
	//
	Live bool
}

func NewStream

func NewStream(name string, key string, live bool) Stream

func (Stream) String

func (s Stream) String() string

type SubscribeErrorMsg

type SubscribeErrorMsg struct {
	Error string
}

type SubscribeMsg

type SubscribeMsg struct {
	Stream   Stream
	History  *Range `rlp:"nil"`
	Priority uint8  //
}

type SwarmChunkServer

type SwarmChunkServer struct {
	// contains filtered or unexported fields
}

func NewSwarmChunkServer

func NewSwarmChunkServer(db *storage.DBAPI) *SwarmChunkServer

func (*SwarmChunkServer) Close

func (s *SwarmChunkServer) Close()

func (*SwarmChunkServer) GetData

func (s *SwarmChunkServer) GetData(ctx context.Context, key []byte) ([]byte, error)

func (*SwarmChunkServer) SetNextBatch

func (s *SwarmChunkServer) SetNextBatch(_, _ uint64) (hashes []byte, from uint64, to uint64, proof *HandoverProof, err error)

type SwarmSyncerClient

type SwarmSyncerClient struct {
	// contains filtered or unexported fields
}

func NewSwarmSyncerClient

func NewSwarmSyncerClient(p *Peer, db *storage.DBAPI, ignoreExistingRequest bool, stream Stream) (*SwarmSyncerClient, error)

func (*SwarmSyncerClient) BatchDone

func (s *SwarmSyncerClient) BatchDone(stream Stream, from uint64, hashes []byte, root []byte) func() (*TakeoverProof, error)

func (*SwarmSyncerClient) Close

func (s *SwarmSyncerClient) Close()

func (*SwarmSyncerClient) NeedData

func (s *SwarmSyncerClient) NeedData(ctx context.Context, key []byte) (wait func())

func (*SwarmSyncerClient) TakeoverProof

func (s *SwarmSyncerClient) TakeoverProof(stream Stream, from uint64, hashes []byte, root storage.Address) (*TakeoverProof, error)

type SwarmSyncerServer

type SwarmSyncerServer struct {
	// contains filtered or unexported fields
}

func NewSwarmSyncerServer

func NewSwarmSyncerServer(live bool, po uint8, db *storage.DBAPI) (*SwarmSyncerServer, error)

func (*SwarmSyncerServer) Close

func (s *SwarmSyncerServer) Close()

func (*SwarmSyncerServer) GetData

func (s *SwarmSyncerServer) GetData(ctx context.Context, key []byte) ([]byte, error)

func (*SwarmSyncerServer) SetNextBatch

func (s *SwarmSyncerServer) SetNextBatch(from, to uint64) ([]byte, uint64, uint64, *HandoverProof, error)

type Takeover

type Takeover Handover

type TakeoverProof

type TakeoverProof struct {
	Sig []byte //
	*Takeover
}

type TakeoverProofMsg

type TakeoverProofMsg TakeoverProof

func (TakeoverProofMsg) String

func (m TakeoverProofMsg) String() string

type UnsubscribeMsg

type UnsubscribeMsg struct {
	Stream Stream
}

type WantedHashesMsg

type WantedHashesMsg struct {
	Stream   Stream
	Want     []byte //
	From, To uint64 //
}

func (WantedHashesMsg) String

func (m WantedHashesMsg) String() string

type WrappedPriorityMsg

type WrappedPriorityMsg struct {
	Context context.Context
	Msg     interface{}
}

Directories

Path Synopsis

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL