Back to

Package crdt

Latest Go to latest
Published: Jun 9, 2020 | Licenses: Apache-2.0 , MIT , Apache-2.0 , MIT | Module:


Package crdt provides a replicated go-datastore (key-value store) implementation using Merkle-CRDTs built with IPLD nodes.

This Datastore is agnostic to how new MerkleDAG roots are broadcasted to the rest of replicas (`Broadcaster` component) and to how the IPLD nodes are made discoverable and retrievable to by other replicas (`DAGSyncer` component).

The implementation is based on the "Merkle-CRDTs: Merkle-DAGs meet CRDTs" paper by Héctor Sanjuán, Samuli Pöyhtäri and Pedro Teixeira.

Note that, in the absence of compaction (which must be performed manually), a crdt.Datastore will only grow in size even when keys are deleted.

The time to be fully synced for new Datastore replicas will depend on how fast they can retrieve the DAGs announced by the other replicas, but newer values will be available before older ones.


Package Files


var (
	ErrNoMoreBroadcast = errors.New("receiving blocks aborted since no new blocks will be broadcasted")

Common errors.

type Broadcaster

type Broadcaster interface {
	// Send payload to other replicas.
	Broadcast([]byte) error
	// Obtain the next payload received from the network.
	Next() ([]byte, error)

A Broadcaster provides a way to send (notify) an opaque payload to all replicas and to retrieve payloads broadcasted.

type DAGSyncer

type DAGSyncer interface {
	// Returns true if the block is locally available (therefore, it
	// is considered processed).
	HasBlock(c cid.Cid) (bool, error)

A DAGSyncer is an abstraction to an IPLD-based p2p storage layer. A DAGSyncer is a DAGService with the ability to publish new ipld nodes to the network, and retrieving others from it.

type Datastore

type Datastore struct {
	// contains filtered or unexported fields

Datastore makes a go-datastore a distributed Key-Value store using Merkle-CRDTs and IPLD.

func New

func New(
	store ds.Datastore,
	namespace ds.Key,
	dagSyncer DAGSyncer,
	bcast Broadcaster,
	opts *Options,
) (*Datastore, error)

New returns a Merkle-CRDT-based Datastore using the given one to persist all the necessary data under the given namespace. It needs a DAG-Syncer component for IPLD nodes and a Broadcaster component to distribute and receive information to and from the rest of replicas. Actual implementation of these must be provided by the user, but it normally means using ipfs-lite ( as a DAG Syncer and the included libp2p PubSubBroadcaster as a Broadcaster.

The given Datastatore is used to back all CRDT-datastore contents and accounting information. When using an asynchronous datastore, the user is in charge of calling Sync() regularly. Sync() will persist paths related to the given prefix, but note that if other replicas are modifying the datastore, the prefixes that will need syncing are not only those modified by the local replica. Therefore the user should consider calling Sync("/"), with an empty prefix, in that case, or use a synchronouse underlying datastore that persists things directly on write.

The CRDT-Datastore should call Close() before the given store is closed.

func (*Datastore) Batch

func (store *Datastore) Batch() (ds.Batch, error)

Batch implements batching for writes by accumulating Put and Delete in the same CRDT-delta and only applying it and broadcasting it on Commit().

func (*Datastore) Close

func (store *Datastore) Close() error

Close shuts down the CRDT datastore. It should not be used afterwards.

func (*Datastore) Delete

func (store *Datastore) Delete(key ds.Key) error

Delete removes the value for given `key`.

func (*Datastore) Get

func (store *Datastore) Get(key ds.Key) (value []byte, err error)

Get retrieves the object `value` named by `key`. Get will return ErrNotFound if the key is not mapped to a value.

func (*Datastore) GetSize

func (store *Datastore) GetSize(key ds.Key) (size int, err error)

GetSize returns the size of the `value` named by `key`. In some contexts, it may be much cheaper to only get the size of the value rather than retrieving the value itself.

func (*Datastore) Has

func (store *Datastore) Has(key ds.Key) (exists bool, err error)

Has returns whether the `key` is mapped to a `value`. In some contexts, it may be much cheaper only to check for existence of a value, rather than retrieving the value itself. (e.g. HTTP HEAD). The default implementation is found in `GetBackedHas`.

func (*Datastore) PrintDAG

func (store *Datastore) PrintDAG() error

PrintDAG pretty prints the current Merkle-DAG using the given printFunc

func (*Datastore) Put

func (store *Datastore) Put(key ds.Key, value []byte) error

Put stores the object `value` named by `key`.

func (*Datastore) Query

func (store *Datastore) Query(q query.Query) (query.Results, error)

Query searches the datastore and returns a query result. This function may return before the query actually runs. To wait for the query:

result, _ := ds.Query(q)

// use the channel interface; result may come in at different times
for entry := range result.Next() { ... }

// or wait for the query to be completely done
entries, _ := result.Rest()
for entry := range entries { ... }

func (*Datastore) Sync

func (store *Datastore) Sync(prefix ds.Key) error

Sync ensures that all the data under the given prefix is flushed to disk in the underlying datastore.

type Options

type Options struct {
	Logger              logging.StandardLogger
	RebroadcastInterval time.Duration
	// The PutHook function is triggered whenever an element
	// is successfully added to the datastore (either by a local
	// or remote update), and only when that addition is considered the
	// prevalent value.
	PutHook func(k ds.Key, v []byte)
	// The DeleteHook function is triggered whenever a version of an
	// element is successfully removed from the datastore (either by a
	// local or remote update). Unordered and concurrent updates may
	// result in the DeleteHook being triggered even though the element is
	// still present in the datastore because it was re-added. If that is
	// relevant, use Has() to check if the removed element is still part
	// of the datastore.
	DeleteHook func(k ds.Key)
	// NumWorkers specifies the number of workers ready to walk DAGs
	NumWorkers int
	// DAGSyncerTimeout specifies how long to wait for a DAGSyncer.
	// Set to 0 to disable.
	DAGSyncerTimeout time.Duration
	// MaxBatchDeltaSize will automatically commit any batches whose
	// delta size gets too big. This helps keep DAG nodes small
	// enough that they will be transferred by the network.
	MaxBatchDeltaSize int

Options holds configurable values for Datastore.

func DefaultOptions

func DefaultOptions() *Options

DefaultOptions initializes an Options object with sensible defaults.

type PubSubBroadcaster

type PubSubBroadcaster struct {
	// contains filtered or unexported fields

PubSubBroadcaster implements a Broadcaster using libp2p PubSub.

func NewPubSubBroadcaster

func NewPubSubBroadcaster(ctx context.Context, psub *pubsub.PubSub, topic string) (*PubSubBroadcaster, error)

NewPubSubBroadcaster returns a new broadcaster using the given PubSub and a topic to subscribe/broadcast to. The given context can be used to cancel the broadcaster. Please register any topic validators before creating the Broadcaster.

The broadcaster can be shut down by cancelling the given context. This must be done before Closing the crdt.Datastore, otherwise things may hang.

func (*PubSubBroadcaster) Broadcast

func (pbc *PubSubBroadcaster) Broadcast(data []byte) error

Broadcast publishes some data.

func (*PubSubBroadcaster) Next

func (pbc *PubSubBroadcaster) Next() ([]byte, error)

Next returns published data.

type SessionDAGSyncer

type SessionDAGSyncer interface {
	Session(context.Context) ipld.NodeGetter

A SessionDAGSyncer is a Sessions-enabled DAGSyncer. This type of DAG-Syncer provides an optimized NodeGetter to make multiple related requests. The same session-enabled NodeGetter is used to download DAG branches when the DAGSyncer supports it.

Documentation was rendered with GOOS=linux and GOARCH=amd64.

Jump to identifier

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to identifier