Documentation ¶
Overview ¶
Package crdt provides a replicated go-datastore (key-value store) implementation using Merkle-CRDTs built with IPLD nodes.
This Datastore is agnostic to how new MerkleDAG roots are broadcasted to the rest of replicas (`Broadcaster` component) and to how the IPLD nodes are made discoverable and retrievable to by other replicas (`DAGSyncer` component).
The implementation is based on the "Merkle-CRDTs: Merkle-DAGs meet CRDTs" paper by Héctor Sanjuán, Samuli Pöyhtäri and Pedro Teixeira.
Note that, in the absence of compaction (which must be performed manually), a crdt.Datastore will only grow in size even when keys are deleted.
The time to be fully synced for new Datastore replicas will depend on how fast they can retrieve the DAGs announced by the other replicas, but newer values will be available before older ones.
Index ¶
- Variables
- type Broadcaster
- type DAGSyncer
- type Datastore
- func (store *Datastore) Batch() (ds.Batch, error)
- func (store *Datastore) Close() error
- func (store *Datastore) Delete(key ds.Key) error
- func (store *Datastore) Get(key ds.Key) (value []byte, err error)
- func (store *Datastore) GetSize(key ds.Key) (size int, err error)
- func (store *Datastore) Has(key ds.Key) (exists bool, err error)
- func (store *Datastore) PrintDAG() error
- func (store *Datastore) Put(key ds.Key, value []byte) error
- func (store *Datastore) Query(q query.Query) (query.Results, error)
- func (store *Datastore) Sync(prefix ds.Key) error
- type Options
- type PubSubBroadcaster
- type SessionDAGSyncer
Constants ¶
This section is empty.
Variables ¶
var (
ErrNoMoreBroadcast = errors.New("receiving blocks aborted since no new blocks will be broadcasted")
)
Common errors.
Functions ¶
This section is empty.
Types ¶
type Broadcaster ¶
type Broadcaster interface { // Send payload to other replicas. Broadcast([]byte) error // Obtain the next payload received from the network. Next() ([]byte, error) }
A Broadcaster provides a way to send (notify) an opaque payload to all replicas and to retrieve payloads broadcasted.
type DAGSyncer ¶
type DAGSyncer interface { ipld.DAGService // Returns true if the block is locally available (therefore, it // is considered processed). HasBlock(c cid.Cid) (bool, error) }
A DAGSyncer is an abstraction to an IPLD-based p2p storage layer. A DAGSyncer is a DAGService with the ability to publish new ipld nodes to the network, and retrieving others from it.
type Datastore ¶
type Datastore struct {
// contains filtered or unexported fields
}
Datastore makes a go-datastore a distributed Key-Value store using Merkle-CRDTs and IPLD.
func New ¶
func New( store ds.Datastore, namespace ds.Key, dagSyncer DAGSyncer, bcast Broadcaster, opts *Options, ) (*Datastore, error)
New returns a Merkle-CRDT-based Datastore using the given one to persist all the necessary data under the given namespace. It needs a DAG-Syncer component for IPLD nodes and a Broadcaster component to distribute and receive information to and from the rest of replicas. Actual implementation of these must be provided by the user, but it normally means using ipfs-lite (https://github.com/hsanjuan/ipfs-lite) as a DAG Syncer and the included libp2p PubSubBroadcaster as a Broadcaster.
The given Datastatore is used to back all CRDT-datastore contents and accounting information. When using an asynchronous datastore, the user is in charge of calling Sync() regularly. Sync() will persist paths related to the given prefix, but note that if other replicas are modifying the datastore, the prefixes that will need syncing are not only those modified by the local replica. Therefore the user should consider calling Sync("/"), with an empty prefix, in that case, or use a synchronouse underlying datastore that persists things directly on write.
The CRDT-Datastore should call Close() before the given store is closed.
func (*Datastore) Batch ¶
Batch implements batching for writes by accumulating Put and Delete in the same CRDT-delta and only applying it and broadcasting it on Commit().
func (*Datastore) Get ¶
Get retrieves the object `value` named by `key`. Get will return ErrNotFound if the key is not mapped to a value.
func (*Datastore) GetSize ¶
GetSize returns the size of the `value` named by `key`. In some contexts, it may be much cheaper to only get the size of the value rather than retrieving the value itself.
func (*Datastore) Has ¶
Has returns whether the `key` is mapped to a `value`. In some contexts, it may be much cheaper only to check for existence of a value, rather than retrieving the value itself. (e.g. HTTP HEAD). The default implementation is found in `GetBackedHas`.
func (*Datastore) PrintDAG ¶
PrintDAG pretty prints the current Merkle-DAG using the given printFunc
func (*Datastore) Query ¶
Query searches the datastore and returns a query result. This function may return before the query actually runs. To wait for the query:
result, _ := ds.Query(q) // use the channel interface; result may come in at different times for entry := range result.Next() { ... } // or wait for the query to be completely done entries, _ := result.Rest() for entry := range entries { ... }
type Options ¶
type Options struct { Logger logging.StandardLogger RebroadcastInterval time.Duration // The PutHook function is triggered whenever an element // is successfully added to the datastore (either by a local // or remote update), and only when that addition is considered the // prevalent value. PutHook func(k ds.Key, v []byte) // The DeleteHook function is triggered whenever a version of an // element is successfully removed from the datastore (either by a // local or remote update). Unordered and concurrent updates may // result in the DeleteHook being triggered even though the element is // still present in the datastore because it was re-added. If that is // relevant, use Has() to check if the removed element is still part // of the datastore. DeleteHook func(k ds.Key) // NumWorkers specifies the number of workers ready to walk DAGs NumWorkers int // DAGSyncerTimeout specifies how long to wait for a DAGSyncer. // Set to 0 to disable. DAGSyncerTimeout time.Duration // MaxBatchDeltaSize will automatically commit any batches whose // delta size gets too big. This helps keep DAG nodes small // enough that they will be transferred by the network. MaxBatchDeltaSize int }
Options holds configurable values for Datastore.
func DefaultOptions ¶
func DefaultOptions() *Options
DefaultOptions initializes an Options object with sensible defaults.
type PubSubBroadcaster ¶ added in v0.0.3
type PubSubBroadcaster struct {
// contains filtered or unexported fields
}
PubSubBroadcaster implements a Broadcaster using libp2p PubSub.
func NewPubSubBroadcaster ¶ added in v0.0.3
func NewPubSubBroadcaster(ctx context.Context, psub *pubsub.PubSub, topic string) (*PubSubBroadcaster, error)
NewPubSubBroadcaster returns a new broadcaster using the given PubSub and a topic to subscribe/broadcast to. The given context can be used to cancel the broadcaster. Please register any topic validators before creating the Broadcaster.
The broadcaster can be shut down by cancelling the given context. This must be done before Closing the crdt.Datastore, otherwise things may hang.
func (*PubSubBroadcaster) Broadcast ¶ added in v0.0.3
func (pbc *PubSubBroadcaster) Broadcast(data []byte) error
Broadcast publishes some data.
func (*PubSubBroadcaster) Next ¶ added in v0.0.3
func (pbc *PubSubBroadcaster) Next() ([]byte, error)
Next returns published data.
type SessionDAGSyncer ¶ added in v0.0.8
type SessionDAGSyncer interface { DAGSyncer Session(context.Context) ipld.NodeGetter }
A SessionDAGSyncer is a Sessions-enabled DAGSyncer. This type of DAG-Syncer provides an optimized NodeGetter to make multiple related requests. The same session-enabled NodeGetter is used to download DAG branches when the DAGSyncer supports it.