sync

package
v0.0.0-...-73c7f38 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Apr 5, 2019 License: MIT Imports: 17 Imported by: 0

README

Synchronization

This package synchronizes graphs using libp2p

Motivation and Use Cases

See more information about multi-writer/collaborative data structures and the need for a multi-writer IPNS

See also the proposal for supporting the above use case as part of the set of IPLD replication options

Algorithmic Interface

The graph synchronization algorithm runs on various machines each with DAGs G1, G2, ... GN (that have a common root node) and results in each machine having the same resulting DAG G' which contains the edges and vertices from each of the input graphs.

Current Implementation

The current graph synchronization approach works by having all users send their full graphs to each other and then merge the graphs on their own. Each node in the graph is the tuple (parentCIDs, CID, childCIDs) where the content the CID refers to is the tuple (parentCIDs, dataCID). It's very easy to merge these graphs together since the either nodes that are new between graphs are simply added as new children of their appropriate parents, or if the node exists in both graphs, then any new child edges are added to the existing node.

Look at the test cases in gsync_test.go for examples

There is also a basic pinning service for these graphs available at pinner.go and example usage at pinner_test.go

Documentation

Index

Constants

View Source
const MWIPNSKey = protocol.ID("/mwipns/1.0.0")

Variables

This section is empty.

Functions

func NewGossipSyncMW

func NewGossipSyncMW(ctx context.Context, h host.Host, mcache *MWMessageCache, protocolID protocol.ID, opts ...pubsub.Option) (*pubsub.PubSub, error)

NewGossipBaseSub returns a new PubSub object using GossipSubRouter as the router.

func SendFullGraph

func SendFullGraph(gp OperationDAG, graphID *cid.Cid, ha host.Host, peers []peer.ID) int

Types

type AddNodeOperation

type AddNodeOperation struct {
	Value   *cid.Cid
	Parents []*cid.Cid
}

AddNodeOperation is the basic operation for adding a node to a DAG

func (AddNodeOperation) Marshal

func (msg AddNodeOperation) Marshal() ([]byte, error)

Marshal returns the byte representation of the object

func (*AddNodeOperation) Unmarshal

func (msg *AddNodeOperation) Unmarshal(mk []byte) error

Unmarshal fills the structure with data from the bytes

type AutomaticGraphSynchronizationManager

type AutomaticGraphSynchronizationManager interface {
	GraphSynchronizationManager
	SyncGraph(IPNSKey *cid.Cid)
	GetGraphProvider(IPNSKey cid.Cid) GraphProvider
}

func NewGraphSychronizer

func NewGraphSychronizer(ha host.Host, storage IPNSLocalStorage, rngSrc mrand.Source) AutomaticGraphSynchronizationManager

NewGraphSychronizer Creates a GraphSynchronizationManager that manages the updates to the graphs

type BasicMWIPNS

type BasicMWIPNS struct {
	// contains filtered or unexported fields
}

func (*BasicMWIPNS) AddNewVersion

func (ipns *BasicMWIPNS) AddNewVersion(newCid *cid.Cid, prevCids ...*cid.Cid)

AddNewVersion modify the object into a new version based on the previous modifications it depends on

func (*BasicMWIPNS) GetLatestVersionHistories

func (ipns *BasicMWIPNS) GetLatestVersionHistories() []DagNode

GetLatestVersionHistories Each DagNode returned represents one possible version of the data and the history leading up to it

func (*BasicMWIPNS) GetNumberOfOperations

func (ipns *BasicMWIPNS) GetNumberOfOperations() int

GetNumberOfOperations returns the number of Dag nodes/operations processed

func (*BasicMWIPNS) GetRoot

func (ipns *BasicMWIPNS) GetRoot() DagNode

GetRoot returns the root DagNode

type DagNode

type DagNode interface {
	GetNodeID() cid.Cid // Content format for the Cid is {ParentCids[], DataCid}
	GetValue() cid.Cid  // Returns the Cid of the content
	GetParents() map[cid.Cid]DagNode
	GetChildren() map[cid.Cid]DagNode
	AddChildren(...DagNode) int // Returns the number of children added
	GetAsOp() *AddNodeOperation
}

DagNode is a DAG node where both the nodes and their content are described by Cids

type FullSendGSync

type FullSendGSync struct {
	GraphID    *cid.Cid
	Operations []*AddNodeOperation
}

FullSendGSync is a (large) message that sends a peer's full state about the graph being synchronized

func (FullSendGSync) Marshal

func (msg FullSendGSync) Marshal() ([]byte, error)

Marshal returns the byte representation of the object

func (*FullSendGSync) Unmarshal

func (msg *FullSendGSync) Unmarshal(mk []byte) error

Unmarshal fills the structure with data from the bytes

type GSMultiWriterIPNS

type GSMultiWriterIPNS struct {
	BasicMWIPNS
	IPNSKey cid.Cid
	Gsync   GraphSynchronizationManager
}

GSMultiWriterIPNS provides a MultiWriterIPNS layer on top of a GraphSynchronizer

func (*GSMultiWriterIPNS) GetKey

func (ipns *GSMultiWriterIPNS) GetKey() cid.Cid

type GSyncMessage

type GSyncMessage struct {
	MessageType GSyncState
	Msg         []byte
}

GSyncMessage is the container for all messages in the gsync protocol

func (GSyncMessage) Marshal

func (msg GSyncMessage) Marshal() ([]byte, error)

Marshal returns the byte representation of the object

func (*GSyncMessage) Unmarshal

func (msg *GSyncMessage) Unmarshal(mk []byte) error

Unmarshal fills the structure with data from the bytes

type GSyncState

type GSyncState int

GSyncState The state of the Graph Syncronization algorithm

const (
	UNKNOWN GSyncState = iota
	FULL_GRAPH
	UPDATE
	REQUEST_FULL_GRAPH
)

The possible states of the Graph Syncronization algorithm

type GossipMultiWriterIPNS

type GossipMultiWriterIPNS interface {
	MultiWriterIPNS
	GetRoot() DagNode
	GetNumberOfOperations() int
	GetKey() cid.Cid
}

func NewGossipMultiWriterIPNS

func NewGossipMultiWriterIPNS(IPNSKey cid.Cid, Gsync GraphSynchronizationManager) GossipMultiWriterIPNS

type GraphProvider

type GraphProvider interface {
	OperationDAG
	Update(*AddNodeOperation)
	SyncGraph()
}

GraphProvider Manages a graph of operations, including broadcasting and receiving updates

type GraphSynchronizationManager

type GraphSynchronizationManager interface {
	GetGraph(IPNSKey cid.Cid) OperationDAG
	AddGraph(IPNSKey cid.Cid)
	RemoveGraph(IPNSKey cid.Cid)
}

GraphSynchronizationManager manages the synchronization of multiple graphs

type GraphUpdater

type GraphUpdater func(*AddNodeOperation)

type IPNSLocalStorage

type IPNSLocalStorage interface {
	GetIPNSKeys() []cid.Cid
	GetPeers(IPNSKey cid.Cid) []peer.ID
	GetOps(IPNSKey cid.Cid) []*AddNodeOperation
}

IPNSLocalStorage is a read interface for data that an MWIPNS node might need

type MWIPNSPinner

type MWIPNSPinner struct {
	Synchronizer GraphSynchronizationManager
	Storage      UpdateableIPNSLocalStorage
	// contains filtered or unexported fields
}

MWIPNSPinner handles node that can be registered with to persist MWIPNS structures

func NewPinner

func NewPinner(h host.Host) *MWIPNSPinner

NewPinner instantiates an MWIPNSPinner

type MWMessageCache

type MWMessageCache struct {
	ComputeID func(msg *pb.Message) string

	IDMsgMap map[string]*pb.Message
	// contains filtered or unexported fields
}

func NewMWMessageCache

func NewMWMessageCache(ComputeID func(msg *pb.Message) string) *MWMessageCache

func (*MWMessageCache) Get

func (mc *MWMessageCache) Get(mid string) (*pb.Message, bool)

func (*MWMessageCache) GetGossipIDs

func (mc *MWMessageCache) GetGossipIDs(topic string) []string

func (*MWMessageCache) Put

func (mc *MWMessageCache) Put(msg *pb.Message) map[string]struct{}

func (*MWMessageCache) Shift

func (mc *MWMessageCache) Shift()

type ManualGraphSynchronizationManager

type ManualGraphSynchronizationManager interface {
	GraphSynchronizationManager
	SyncGraph(IPNSKey *cid.Cid, peer peer.ID)
}

func NewManualGraphSychronizer

func NewManualGraphSychronizer(ha host.Host) ManualGraphSynchronizationManager

type MultiWriterIPNS

type MultiWriterIPNS interface {
	AddNewVersion(newCid *cid.Cid, prevCids ...*cid.Cid)
	GetLatestVersionHistories() []DagNode // Each DagNode returned represents one possible version of the data and the history leading up to it
}

MultiWriterIPNS supports multiwriter modification of an object where the modifications are represented by DAG nodes containing Cids of operations on the object

type OpBasedDagNode

type OpBasedDagNode struct {
	Children map[cid.Cid]DagNode
	Parents  map[cid.Cid]DagNode
	// contains filtered or unexported fields
}

OpBasedDagNode is a DAG node wherein every node is an operation on the graph's state

func (*OpBasedDagNode) AddChildren

func (node *OpBasedDagNode) AddChildren(nodes ...DagNode) int

AddChildren returns the number of children added

func (*OpBasedDagNode) GetAsOp

func (node *OpBasedDagNode) GetAsOp() *AddNodeOperation

GetAsOp returns the operation that created the node

func (*OpBasedDagNode) GetChildren

func (node *OpBasedDagNode) GetChildren() map[cid.Cid]DagNode

GetChildren returns a map of Cid -> DagNode. Do not modify

func (*OpBasedDagNode) GetNodeID

func (node *OpBasedDagNode) GetNodeID() cid.Cid

GetNodeID returns the Cid of the node (as opposed to the node's content)

func (*OpBasedDagNode) GetParents

func (node *OpBasedDagNode) GetParents() map[cid.Cid]DagNode

GetParents returns a map of Cid -> DagNode. Do not modify

func (*OpBasedDagNode) GetValue

func (node *OpBasedDagNode) GetValue() cid.Cid

GetValue returns the Cid of the node's value (as opposed to the node itself)

type OperationDAG

type OperationDAG interface {
	MultiWriterIPNS
	ReceiveUpdates(...*AddNodeOperation)
	GetOps() []*AddNodeOperation
	GetDagNodes() map[cid.Cid]DagNode
	TryGetNode(nodeID cid.Cid) (DagNode, bool)
	GetRoot() DagNode
}

type PubSubMWIPNS

type PubSubMWIPNS struct {
	// contains filtered or unexported fields
}

func (*PubSubMWIPNS) AddNewVersion

func (gs *PubSubMWIPNS) AddNewVersion(newCid *cid.Cid, prevCids ...*cid.Cid)

func (*PubSubMWIPNS) GetKey

func (ipns *PubSubMWIPNS) GetKey() cid.Cid

func (*PubSubMWIPNS) GetLatestVersionHistories

func (gs *PubSubMWIPNS) GetLatestVersionHistories() []DagNode

func (*PubSubMWIPNS) GetNumberOfOperations

func (gs *PubSubMWIPNS) GetNumberOfOperations() int

func (*PubSubMWIPNS) GetRoot

func (gs *PubSubMWIPNS) GetRoot() DagNode

type RegisterGraph

type RegisterGraph struct {
	GraphID cid.Cid
	RootCID cid.Cid
	Peers   []peer.ID
}

RegisterGraph is the RPC structure for registering an append-only DAG with an MWIPNSPinner

func (RegisterGraph) Marshal

func (reg RegisterGraph) Marshal() ([]byte, error)

Marshal returns the byte representation of the object

func (*RegisterGraph) Unmarshal

func (reg *RegisterGraph) Unmarshal(mk []byte) error

Unmarshal fills the structure with data from the bytes

type RemotePinner

type RemotePinner struct {
	ID peer.ID
	// contains filtered or unexported fields
}

RemotePinner handles remote calls to a MWIPNSPinner node

func (*RemotePinner) RegisterGraph

func (pinner *RemotePinner) RegisterGraph(graphID cid.Cid, peers []peer.ID) error

RegisterGraph registers a graph with the MWIPNSPinner

type RequestFullSendGSync

type RequestFullSendGSync struct {
	GraphID *cid.Cid
}

FullSendGSync is a (large) message that sends a peer's full state about the graph being synchronized

func (RequestFullSendGSync) Marshal

func (msg RequestFullSendGSync) Marshal() ([]byte, error)

Marshal returns the byte representation of the object

func (*RequestFullSendGSync) Unmarshal

func (msg *RequestFullSendGSync) Unmarshal(mk []byte) error

Unmarshal fills the structure with data from the bytes

type SyncGossipConfiguration

type SyncGossipConfiguration struct {
	// contains filtered or unexported fields
}

func (*SyncGossipConfiguration) GetCacher

func (*SyncGossipConfiguration) Protocol

func (gs *SyncGossipConfiguration) Protocol() protocol.ID

func (*SyncGossipConfiguration) Publish

func (*SyncGossipConfiguration) SupportedProtocols

func (gs *SyncGossipConfiguration) SupportedProtocols() []protocol.ID

type SynchronizeCompatibleMessageCacher

type SynchronizeCompatibleMessageCacher interface {
	pubsub.MessageCacheReader
	Put(msg *pb.Message) map[string]struct{}
}

type UpdateableIPNSLocalStorage

type UpdateableIPNSLocalStorage interface {
	IPNSLocalStorage
	AddPeers(IPNSKey cid.Cid, peers ...peer.ID)
	AddOps(IPNSKey cid.Cid, ops ...*AddNodeOperation)
}

UpdateableIPNSLocalStorage is a read/write interface for data that an MWIPNS node might need

func NewMemoryIPNSLocalStorage

func NewMemoryIPNSLocalStorage() UpdateableIPNSLocalStorage

NewMemoryIPNSLocalStorage returns a memory backed UpdateableIPNSLocalStorage

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL