dsync

package
v0.2.2 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: May 4, 2021 License: MIT Imports: 28 Imported by: 3

Documentation

Overview

Package dsync implements point-to-point merkle-DAG-syncing between a local instance and remote source. It's like rsync, but specific to merkle-DAGs. dsync operates over HTTP and (soon) libp2p connections.

dsync by default can push & fetch DAGs to another dsync instance, called the "remote". Dsync instances that want to accept merkle-DAGs must opt into operating as a remote by configuring a dsync.Dsync instance to do so

Dsync is structured as bring-your-own DAG vetting. All push requests are run through two "check" functions called at the beginning and and of the push process. Each check function supplies details about the push being requested or completed. The default intial check function rejects all requests, and must be overridden to accept data

Index

Examples

Constants

View Source
const (
	// DsyncProtocolID is the dsyc p2p Protocol Identifier & version tag
	DsyncProtocolID = protocol.ID("/dsync/0.2.0")
)

Variables

View Source
var (
	// ErrRemoveNotSupported is the error value returned by remotes that don't
	// support delete operations
	ErrRemoveNotSupported = fmt.Errorf("remove is not supported")
	// ErrUnknownProtocolVersion is the error for when the version of the remote
	// protocol is unknown, usually because the handshake with the the remote
	// hasn't happened yet
	ErrUnknownProtocolVersion = fmt.Errorf("unknown protocol version")
)
View Source
var DefaultDagFinalCheck = func(context.Context, dag.Info, map[string]string) error {
	return nil
}

DefaultDagFinalCheck by default performs no check

View Source
var DefaultDagPrecheck = func(context.Context, dag.Info, map[string]string) error {
	return fmt.Errorf("remote is not configured to accept DAGs")
}

DefaultDagPrecheck rejects all requests Dsync users are required to override this hook to make dsync work, and are expected to supply a trust model in this hook. An example trust model is a peerID and contentID accept/reject list supplied by the application

Precheck could also reject based on size limitations by examining data provided by the requester, but be advised the Info provided is gossip at this point in the sync process.

If the Precheck hook returns an error the remote will deny the request to push any blocks, no session will be created and the error message will be returned in the response status.

Functions

func AddAllFromCARReader added in v0.2.2

func AddAllFromCARReader(ctx context.Context, bapi coreiface.BlockAPI, r io.Reader, progCh chan cid.Cid) (int, error)

AddAllFromCARReader consumers a CAR reader stream, placing all blocks in the given blockstore

func HTTPRemoteHandler added in v0.2.0

func HTTPRemoteHandler(ds *Dsync) http.HandlerFunc

HTTPRemoteHandler exposes a Dsync remote over HTTP by exposing a HTTP handler that interlocks with methods exposed by HTTPClient

func NewLocalNodeGetter added in v0.2.0

func NewLocalNodeGetter(api coreiface.CoreAPI) (ipld.NodeGetter, error)

NewLocalNodeGetter creates a local NodeGetter from a ipfs CoreAPI instance "local" NodeGetters don't fetch over the dweb.

it's important to pass Dsync a NodeGetter instance that doesn't perform any network operations when trying to resolve blocks. If we don't do this dsync will ask ipfs for blocks it doesn't have, and ipfs will try to *fetch* this blocks, which kinda defeats the point of syncing blocks by other means

func NewManifestCARReader added in v0.2.2

func NewManifestCARReader(ctx context.Context, ng ipld.NodeGetter, mfst *dag.Manifest, progCh chan cid.Cid) (io.Reader, error)

NewManifestCARReader creates a Content-addressed ARchive on the fly from a manifest and a node getter. It fetches blocks in order from the list of cids in the manifest and writes them to a buffer as the reader is consumed The roots specified in the archive header match the manifest RootCID method If an incomplete manifest graph is passed to NewManifestCARReader, the resulting archive will not be a complete graph. This is permitted by the spec, and used by dsync to create an archive of only-missing-blocks for more on CAR files, see: https://github.com/ipld/specs/blob/master/block-layer/content-addressable-archives.md If supplied a non-nil channel progress channel, the stream will send as each CID is buffered to the read stream

func OptLibp2pHost added in v0.2.0

func OptLibp2pHost(host host.Host) func(cfg *Config)

OptLibp2pHost is a convenience function for supplying a libp2p.Host to dsync.New

Types

type Config added in v0.2.0

type Config struct {
	// InfoStore is an optional caching layer for dag.Info objects
	InfoStore dag.InfoStore
	// provide a listening addres to have Dsync spin up an HTTP server when
	// StartRemote(ctx) is called
	HTTPRemoteAddress string
	// to send & push over libp2p connections, provide a libp2p host
	Libp2pHost host.Host
	// PinAPI is required for remotes to accept pinning requests
	PinAPI coreiface.PinAPI

	// RequireAllBlocks will skip checking for blocks already present on the
	// remote, requiring push requests to send all blocks each time
	// This is a helpful override if the receiving node can't distinguish between
	// local and network block access, as with the ipfs-http-api intreface
	RequireAllBlocks bool
	// AllowRemoves let's dsync opt into remove requests. removes are
	// disabled by default
	AllowRemoves bool

	// required check function for a remote accepting DAGs, this hook will be
	// called before a push is allowed to begin
	PushPreCheck Hook
	// optional check function for screening a receive before potentially pinning
	PushFinalCheck Hook
	// optional check function called after successful transfer
	PushComplete Hook
	// optional check to run on dagInfo requests before sending an info back
	GetDagInfoCheck Hook
	// optional hook to run before allowing a stream of blocks
	OpenBlockStreamCheck Hook
	// optional check to run before executing a remove operation
	// the dag.Info given to this check will only contain the root CID being
	// removed
	RemoveCheck Hook
}

Config encapsulates optional Dsync configuration

func (*Config) Validate added in v0.2.0

func (cfg *Config) Validate() error

Validate confirms the configuration is valid

type DagStreamable added in v0.2.2

type DagStreamable interface {
	// ReceiveBlocks asks a remote to accept a stream of blocks from a local
	// client, this can only happen within a push session
	ReceiveBlocks(ctx context.Context, sessionID string, r io.Reader) error
	// OpenBlockStream asks a remote to generate a block stream
	OpenBlockStream(ctx context.Context, info *dag.Info, meta map[string]string) (io.ReadCloser, error)
}

DagStreamable is an interface for sending and fetching all blocks in a given manifest in one trip

type DagSyncable added in v0.2.0

type DagSyncable interface {
	// NewReceiveSession starts a push session from local to a remote.
	// The remote will return a delta manifest of blocks the remote needs
	// and a session id that must be sent with each block
	NewReceiveSession(info *dag.Info, pinOnComplete bool, meta map[string]string) (sid string, diff *dag.Manifest, err error)
	// ProtocolVersion indicates the version of dsync the remote speaks, only
	// available after a handshake is established. Calling this method before a
	// handshake must return ErrUnknownProtocolVersion
	ProtocolVersion() (protocol.ID, error)

	// ReceiveBlock places a block on the remote
	ReceiveBlock(sid, hash string, data []byte) ReceiveResponse
	// GetDagInfo asks the remote for info specified by a the root identifier
	// string of a DAG
	GetDagInfo(ctx context.Context, cidStr string, meta map[string]string) (info *dag.Info, err error)
	// GetBlock gets a block of data from the remote
	GetBlock(ctx context.Context, hash string) (rawdata []byte, err error)
	// RemoveCID asks the remote to remove a cid. Supporting deletes are optional.
	// DagSyncables that don't implement DeleteCID must return
	// ErrDeleteNotSupported
	RemoveCID(ctx context.Context, cidStr string, meta map[string]string) (err error)
}

DagSyncable is a source that can be synced to & from. dsync requests automate calls to this interface with higher-order functions like Push and Pull

In order to coordinate between a local and a remote, you need something that will satisfy the DagSyncable interface on both ends of the wire, one to act as the requester and the other to act as the remote

type Dsync added in v0.2.0

type Dsync struct {
	// contains filtered or unexported fields
}

Dsync is a service for synchronizing a DAG of blocks between a local & remote source

func New added in v0.2.0

func New(localNodes ipld.NodeGetter, blockStore coreiface.BlockAPI, opts ...func(cfg *Config)) (*Dsync, error)

New creates a local Dsync service. By default Dsync can push and pull to remotes. It can be configured to act as a remote for other Dsync instances.

Its crucial that the NodeGetter passed to New be an offline-only getter. if using IPFS, this package defines a helper function: NewLocalNodeGetter to get an offline-only node getter from an ipfs CoreAPI interface

Example
package main

import (
	"context"
	"fmt"
	"io/ioutil"
	"strings"
	"testing"

	"github.com/ipfs/go-cid"
	files "github.com/ipfs/go-ipfs-files"
	coreiface "github.com/ipfs/interface-go-ipfs-core"
	"github.com/ipfs/interface-go-ipfs-core/path"
	"github.com/qri-io/dag"
)

func main() {
	// first some boilerplate setup. In this example we're using "full" IPFS nodes
	// but all that's required is a blockstore and dag core api implementation
	// in this example we're going to use a single context, which doesn't make
	// much sense in production. At a minimum the contexts for nodeA & nodeB
	// would be separate
	ctx, done := context.WithCancel(context.Background())
	defer done()

	// nodeA is an ipfs instance that will create a DAG
	// nodeB is another ipfs instance nodeA will push that DAG to
	nodeA, nodeB := mustNewLocalRemoteIPFSAPI(ctx)

	// Local Setup:
	// add a single block graph to nodeA getting back a content identifier
	cid := mustAddOneBlockDAG(nodeA)

	// make a localNodeGetter, when performing dsync we don't want to fetch
	// blocks from the dweb
	aLocalDS, err := NewLocalNodeGetter(nodeA)
	if err != nil {
		panic(err) // don't panic. real programs handle errors.
	}

	// create nodeA's Dsync instance
	aDsync, err := New(aLocalDS, nodeA.Block())
	if err != nil {
		panic(err)
	}

	// Remote setup:
	// setup the remote we're going to push to, starting by creating a local node
	// getter
	bng, err := NewLocalNodeGetter(nodeB)
	if err != nil {
		panic(err)
	}

	// we're going set up our remote to push over HTTP
	bAddr := ":9595"

	// create the remote instance, configuring it to accept DAGs
	bDsync, err := New(bng, nodeB.Block(), func(cfg *Config) {
		// configure the remote listening address:
		cfg.HTTPRemoteAddress = bAddr

		// we MUST override the PreCheck function. In this example we're making sure
		// no one sends us a bad hash:
		cfg.PushPreCheck = func(ctx context.Context, info dag.Info, _ map[string]string) error {
			if info.Manifest.Nodes[0] == "BadHash" {
				return fmt.Errorf("rejected for secret reasons")
			}
			return nil
		}

		// in order for remotes to allow pinning, they must be provided a PinAPI:
		cfg.PinAPI = nodeB.Pin()
	})
	if err != nil {
		panic(err)
	}

	// start listening for remote pushes & pulls. This should be long running,
	// like a server. Cancel the provided context to close
	if err = bDsync.StartRemote(ctx); err != nil {
		panic(err)
	}

	// Create a Push:
	push, err := aDsync.NewPush(cid.String(), fmt.Sprintf("http://localhost%s/dsync", bAddr), true)
	if err != nil {
		panic(err)
	}

	// We want to see progress, so we spin up a goroutine to listen for  updates
	waitForFmt := make(chan struct{})
	go func() {
		updates := push.Updates()
		for {
			select {
			case update := <-updates:
				fmt.Printf("%d/%d blocks transferred\n", update.CompletedBlocks(), len(update))
				if update.Complete() {
					fmt.Println("done!")
					waitForFmt <- struct{}{}
				}

			case <-ctx.Done():
				// don't leak goroutines
				waitForFmt <- struct{}{}
				return
			}
		}
	}()

	// Do the push
	if err := push.Do(ctx); err != nil {
		panic(err)
	}
	// at this point we know the update is finished

	// prove the block is now in nodeB:
	_, err = nodeB.Block().Get(ctx, path.New(cid.String()))
	if err != nil {
		panic(err)
	}

	// block until updates has had a chance to print
	<-waitForFmt

}

func mustNewLocalRemoteIPFSAPI(ctx context.Context) (local, remote coreiface.CoreAPI) {
	_, a, err := makeAPI(ctx)
	if err != nil {
		panic(err)
	}

	_, b, err := makeAPI(ctx)
	if err != nil {
		panic(err)
	}

	return a, b
}

func newLocalRemoteIPFSAPI(ctx context.Context, t *testing.T) (local, remote coreiface.CoreAPI) {
	_, a, err := makeAPI(ctx)
	if err != nil {
		t.Fatal(err)
	}

	_, b, err := makeAPI(ctx)
	if err != nil {
		t.Fatal(err)
	}

	return a, b
}

func mustAddOneBlockDAG(node coreiface.CoreAPI) cid.Cid {
	ctx := context.Background()
	f := files.NewReaderFile(ioutil.NopCloser(strings.NewReader("y" + strings.Repeat("o", 350))))
	path, err := node.Unixfs().Add(ctx, f)
	if err != nil {
		panic(err)
	}
	return path.Cid()
}

func addOneBlockDAG(node coreiface.CoreAPI, t *testing.T) cid.Cid {
	ctx := context.Background()
	f := files.NewReaderFile(ioutil.NopCloser(strings.NewReader("y" + strings.Repeat("o", 350))))
	path, err := node.Unixfs().Add(ctx, f)
	if err != nil {
		t.Fatal(err)
	}
	return path.Cid()
}
Output:

0/1 blocks transferred
1/1 blocks transferred
done!

func NewTestDsync added in v0.2.0

func NewTestDsync() *Dsync

NewTestDsync returns a Dsync pointer suitable for testing

func (*Dsync) GetBlock added in v0.2.0

func (ds *Dsync) GetBlock(ctx context.Context, hash string) ([]byte, error)

GetBlock returns a single block from the store

func (*Dsync) GetDagInfo added in v0.2.0

func (ds *Dsync) GetDagInfo(ctx context.Context, hash string, meta map[string]string) (info *dag.Info, err error)

GetDagInfo gets the manifest for a DAG rooted at id, checking any configured cache before falling back to generating a new manifest

func (*Dsync) NewPull added in v0.2.0

func (ds *Dsync) NewPull(cidStr, remoteAddr string, meta map[string]string) (*Pull, error)

NewPull creates a pull. A pull fetches an entire DAG from a remote, placing it in the local block store

func (*Dsync) NewPush added in v0.2.0

func (ds *Dsync) NewPush(cidStr, remoteAddr string, pinOnComplete bool) (*Push, error)

NewPush creates a push from Dsync to a remote address

func (*Dsync) NewPushInfo added in v0.2.0

func (ds *Dsync) NewPushInfo(info *dag.Info, remoteAddr string, pinOnComplete bool) (*Push, error)

NewPushInfo creates a push from an existing dag.Info. All blocks in the info manifest must be accessible from the local Dsync block repository

func (*Dsync) NewReceiveSession added in v0.2.0

func (ds *Dsync) NewReceiveSession(info *dag.Info, pinOnComplete bool, meta map[string]string) (sid string, diff *dag.Manifest, err error)

NewReceiveSession takes a manifest sent by a remote and initiates a transfer session. It returns a manifest/diff of the blocks the reciever needs to have a complete DAG new sessions are created with a deadline for completion

func (*Dsync) OpenBlockStream added in v0.2.2

func (ds *Dsync) OpenBlockStream(ctx context.Context, info *dag.Info, meta map[string]string) (io.ReadCloser, error)

OpenBlockStream creates a block stream of the contents of the dag.Info

func (*Dsync) ProtocolVersion added in v0.2.2

func (ds *Dsync) ProtocolVersion() (protocol.ID, error)

ProtocolVersion reports the current procotol version for dsync

func (*Dsync) ReceiveBlock added in v0.2.0

func (ds *Dsync) ReceiveBlock(sid, hash string, data []byte) ReceiveResponse

ReceiveBlock adds one block to the local node that was sent by the remote node It notes in the Receive which nodes have been added When the DAG is complete, it puts the manifest into a DAG info and the DAG info into an infoStore

func (*Dsync) ReceiveBlocks added in v0.2.2

func (ds *Dsync) ReceiveBlocks(ctx context.Context, sid string, r io.Reader) error

ReceiveBlocks ingests blocks being pushed into the local store

func (*Dsync) RemoveCID added in v0.2.0

func (ds *Dsync) RemoveCID(ctx context.Context, cidStr string, meta map[string]string) error

RemoveCID unpins a CID if removes are enabled, does not immideately remove unpinned content

func (*Dsync) StartRemote added in v0.2.0

func (ds *Dsync) StartRemote(ctx context.Context) error

StartRemote makes dsync available for remote requests, starting an HTTP server if a listening address is specified. StartRemote returns immediately. Stop remote service by cancelling the passed-in context.

type HTTPClient added in v0.2.0

type HTTPClient struct {
	URL        string
	NodeGetter format.NodeGetter
	BlockAPI   coreiface.BlockAPI
	// contains filtered or unexported fields
}

HTTPClient is the request side of doing dsync over HTTP

func (*HTTPClient) GetBlock added in v0.2.0

func (rem *HTTPClient) GetBlock(ctx context.Context, id string) (data []byte, err error)

GetBlock fetches a block from a remote source over HTTP

func (*HTTPClient) GetDagInfo added in v0.2.0

func (rem *HTTPClient) GetDagInfo(ctx context.Context, id string, meta map[string]string) (info *dag.Info, err error)

GetDagInfo fetches a manifest from a remote source over HTTP

func (*HTTPClient) NewReceiveSession added in v0.2.0

func (rem *HTTPClient) NewReceiveSession(info *dag.Info, pinOnComplete bool, meta map[string]string) (sid string, diff *dag.Manifest, err error)

NewReceiveSession initiates a session for pushing blocks to a remote. It sends a Manifest to a remote source over HTTP

func (*HTTPClient) OpenBlockStream added in v0.2.2

func (rem *HTTPClient) OpenBlockStream(ctx context.Context, info *dag.Info, meta map[string]string) (io.ReadCloser, error)

OpenBlockStream sends a dag.Info to the remote & asks that it returns a stream of blocks in the info's manifest

func (*HTTPClient) ProtocolVersion added in v0.2.2

func (rem *HTTPClient) ProtocolVersion() (protocol.ID, error)

ProtocolVersion indicates the version of dsync the remote speaks, only available after a handshake is established

func (*HTTPClient) ReceiveBlock added in v0.2.0

func (rem *HTTPClient) ReceiveBlock(sid, hash string, data []byte) ReceiveResponse

ReceiveBlock asks a remote to receive a block over HTTP

func (*HTTPClient) ReceiveBlocks added in v0.2.2

func (rem *HTTPClient) ReceiveBlocks(ctx context.Context, sid string, r io.Reader) error

ReceiveBlocks writes a block stream as an HTTP PUT request to the remote

func (*HTTPClient) RemoveCID added in v0.2.0

func (rem *HTTPClient) RemoveCID(ctx context.Context, id string, meta map[string]string) (err error)

RemoveCID asks a remote to remove a CID

type Hook added in v0.2.0

type Hook func(ctx context.Context, info dag.Info, meta map[string]string) error

Hook is a function that a dsync instance will call at specified points in the sync lifecycle

type Pull added in v0.2.0

type Pull struct {
	// contains filtered or unexported fields
}

Pull coordinates the transfer of missing blocks in a DAG from a remote to a block store

func NewPull added in v0.2.0

func NewPull(cidStr string, lng ipld.NodeGetter, bapi coreiface.BlockAPI, rem DagSyncable, meta map[string]string) (pull *Pull, err error)

NewPull sets up fetching a DAG at an id from a remote

func NewPullWithInfo added in v0.2.0

func NewPullWithInfo(info *dag.Info, lng ipld.NodeGetter, bapi coreiface.BlockAPI, rem DagSyncable, meta map[string]string) (pull *Pull, err error)

NewPullWithInfo creates a pull when we already have a dag.Info

func (*Pull) Do added in v0.2.0

func (f *Pull) Do(ctx context.Context) (err error)

Do executes the pull, blocking until complete

func (*Pull) Updates added in v0.2.0

func (f *Pull) Updates() <-chan dag.Completion

Updates returns a read-only channel of pull completion changes

type Push added in v0.2.0

type Push struct {
	// contains filtered or unexported fields
}

Push coordinates sending a manifest to a remote, tracking progress and state

func NewPush added in v0.2.0

func NewPush(lng ipld.NodeGetter, info *dag.Info, remote DagSyncable, pinOnComplete bool) (*Push, error)

NewPush initiates a send for a DAG at an id from a local to a remote. Push is initiated by the local node

func (*Push) Do added in v0.2.0

func (snd *Push) Do(ctx context.Context) (err error)

Do executes the push, blocking until complete

func (*Push) SetMeta added in v0.2.0

func (snd *Push) SetMeta(meta map[string]string)

SetMeta associates metadata with a push before its sent. These details may be leveraged by applications built on top of dsync. They're ignored by dsync. Meta must be set before starting the push

func (*Push) Updates added in v0.2.0

func (snd *Push) Updates() <-chan dag.Completion

Updates returns a read-only channel of Completion objects that depict transfer state

type ReceiveResponse added in v0.2.0

type ReceiveResponse struct {
	Hash   string
	Status ReceiveResponseStatus
	Err    error
}

ReceiveResponse defines the result of sending a block, or attempting to send a block.

type ReceiveResponseStatus added in v0.2.0

type ReceiveResponseStatus int

ReceiveResponseStatus defines types of results for a request

const (
	// StatusErrored indicates the request failed and cannot be retried
	StatusErrored ReceiveResponseStatus = -1
	// StatusOk indicates the request completed successfully
	StatusOk ReceiveResponseStatus = 0
	// StatusRetry indicates the request can be attempted again
	StatusRetry ReceiveResponseStatus = 1
)

func (ReceiveResponseStatus) String added in v0.2.0

func (s ReceiveResponseStatus) String() string

String returns a string representation of the status

Directories

Path Synopsis
DsyncPlugin is an ipfs deamon plugin for embedding dsync functionality directly into IPFS https://github.com/ipfs/go-ipfs-example-plugin
DsyncPlugin is an ipfs deamon plugin for embedding dsync functionality directly into IPFS https://github.com/ipfs/go-ipfs-example-plugin
cli

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL