crdt

package
v0.0.0-...-a4d05e0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: May 11, 2023 License: MIT Imports: 12 Imported by: 0

README

XMTP CRDT

This package implements replicated event streams as Merkle-Clocks based on this paper. It attempts to treat topics as autonomous, independent units although the underlying supporting components (store, broadcaster, syncer) will likely be shared among them.

To unpack in broad strokes what "event stream as Merkle-Clock" means, and to introduce the terminology used throughout, let's look at this stream state that was generated by randomly sending 10 events through a network of 3 replicas.

t0

Each oval captures a single Event of the clock. The arrows point at the most recent preceding events (the heads) known to the node that received that message from a client. In other words the arrows represent "happened after" relationships between the Events as observed by the individual nodes. Collectively the arrows coming out of an Event represent the links of the Event to its immediately preceding Events. This whole directed acyclic graph (DAG) forms a clock in the sense that it denotes a partial order of a sequence of Events captured by the "happened before" links. It is partial, because for example we don't know whether Event 4 happened before Event 5 or vice versa, but we do know that both happened after Event 2 and 3, and both happened before event 8.

What makes this clock a Merkle-Clock are the long (shortened in the picture) hex sequences associated with each Event. These are content identifiers, CIDs, which

  1. uniquely identify each Event
  2. capture the Event payload (the message is hashed into the CID)
  3. capture the Event links (the CIDs of the linked events are also hashed into the CID)

The goal of the event stream as Merkle-Clock is to support eventually consistent replication of events among the replicas. Events can be received by any node at any time. We want to make sure that all replicas participating in the stream eventually have all the events that happened across the network. Moreover we also want to make sure that all the nodes agree on the ordering of the events, i.e. given enough time they will all end up with the same DAG. The CIDs make it easy for nodes to see whether they already have given Event and whether they are missing any of its links. The CIDs also make things immutable, which allows nodes to stop following missing link chains once they reach known events.

Replication is mediated by three supporting facilities: the store, the broadcaster and the syncer. The store simply persists Events on a node. It allows the node to serve the Events it has to its clients and to other nodes. The broadcaster is used by the nodes to announce new Events they create to the rest of the network. However broadcasts are not reliable, they may not reach some nodes due to transient network issues or due to nodes being turned off temporarily. This means a node may receive a new event in a broadcast where some of the links of the Event are unknown to it. In this case the nodes can use the syncer to request missing events from the rest of the network. This way nodes can backfill their incomplete DAGs and catch up with the others.

The implementation here follows this broad structure, the Event, Topic, Node as well as the Store, Broadcaster and Syncer are the main components of it.

TODO

  • bootstrapping a new node efficiently
  • investigate possible lockups
  • metrics
  • fetching multiple cids at a time
  • fetching multiple topics at a time
  • validation
  • consistent ordering respecting the DAG
  • deep linking improve DAG sync speed
  • rebroadcasting heads to catch up dormant topics
  • what if anything should we do about inactive/dormant topics
  • verify correctness: is there any scenario where nodes may fail to fully sync?

Documentation

Index

Constants

This section is empty.

Variables

View Source
var ErrStoreInvalidCursor = errors.New("cursor event not found")

The cursor event couldn't be find in the event range resulting from the parameters of the query. The store may have changed since the query that yielded that cursor.

Functions

This section is empty.

Types

type Broadcaster

type Broadcaster interface {
	// Broadcast sends an Event out to the network
	Broadcast(context.Context, *types.Event) error

	// Next obtains the next event received from the network.
	Next(ctx context.Context) (*types.Event, error)

	// Close gracefully closes the broadcaster.
	Close() error
}

Broadcaster manages broadcasts for a replica.

type Metrics

type Metrics struct {
	// contains filtered or unexported fields
}

func NewMetrics

func NewMetrics() *Metrics

type NewEventFunc

type NewEventFunc func(ev *types.Event)

type Replica

type Replica struct {
	// contains filtered or unexported fields
}

Replica manages the DAG of a dataset replica.

func NewReplica

func NewReplica(ctx context.Context, metrics *Metrics, store Store, bc Broadcaster, syncer Syncer, onNewEvent NewEventFunc) (*Replica, error)

func (*Replica) BroadcastAppend

func (r *Replica) BroadcastAppend(ctx context.Context, env *messagev1.Envelope) (*types.Event, error)

func (*Replica) Close

func (r *Replica) Close()

func (*Replica) GetEvents

func (r *Replica) GetEvents(ctx context.Context, cids ...mh.Multihash) ([]*types.Event, error)

func (*Replica) Query

type Store

type Store interface {
	// AppendEvent creates and stores a new Event,
	// making the current heads its links and
	// replacing the heads with the new Event.
	// Returns the new Event.
	AppendEvent(ctx context.Context, env *messagev1.Envelope) (*types.Event, error)

	// InsertEvent stores the Event if it isn't know yet,
	// Returns whether it was actually added.
	InsertEvent(ctx context.Context, ev *types.Event) (bool, error)

	// InsertHead stores the Event if it isn't know yet,
	// and add it to the heads
	// Returns whether it was actually added.
	InsertHead(ctx context.Context, ev *types.Event) (bool, error)

	// RemoveHead checks if we already have the event,
	// and also removes it from heads if it's there.
	// Returns whether we already have the event or not.
	RemoveHead(ctx context.Context, cid multihash.Multihash) (bool, error)

	// GetEvents returns the set of events matching the given set of CIDs.
	GetEvents(ctx context.Context, cids ...multihash.Multihash) ([]*types.Event, error)

	// FindMissingLinks scans the whole topic for links
	// pointing to Events that are not present in the topic.
	// Returns the list of all missing links.
	FindMissingLinks(ctx context.Context) ([]multihash.Multihash, error)

	// Query returns a set of envelopes matching the query request criteria.
	Query(ctx context.Context, req *messagev1.QueryRequest) (*messagev1.QueryResponse, error)
}

Store represents the storage capacity for a specific CRDT.

type Syncer

type Syncer interface {
	// Fetch retrieves a set of Events from the network DHT.
	// It is a single attempt that can fail completely or return only some
	// of the requested events. If there is no error, the resulting slice is always
	// the same size as the CID slice, but there can be some nils instead of Events in it.
	Fetch(context.Context, []multihash.Multihash) ([]*types.Event, error)

	// Close gracefully closes the syncer.
	Close() error
}

Syncer provides syncing capability to a specific replica.

Directories

Path Synopsis
broadcasters
mem
stores
mem
syncers
mem

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL