yjs

package module
v0.1.6 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Apr 19, 2026 License: MIT Imports: 8 Imported by: 0

README

yjs-go

A Go implementation of the Yjs CRDT protocol.

Yjs is a conflict-free replicated data type (CRDT) framework. Two peers can edit shared data simultaneously without coordination and converge to the same state when they exchange updates. This package implements the core algorithm, the binary v1 wire format, the awareness protocol, and a WebSocket client that speaks the same protocol as y-websocket.

Status: alpha. The API is usable and the binary format is wire-compatible with the official Yjs implementation. Breaking changes are possible before v1.0.

Current limitations

The following features are absent or incomplete in v0.1.x:

  • update-v2 not implemented. update-v1 is the only encode/decode format. All interop tests run against the v1 wire format, which is what y-websocket uses by default.
  • XmlFragment has only Len(). No child insertion, no attribute support, no mutations. Do not use XmlFragment for XML-tree scenarios; that surface will be fleshed out in a future release.
  • Content types 2 (JSON legacy) and 9 (ContentDoc, nested docs) are decode-only stubs. They are not produced by any of the primary CRDT types (Text/Map/Array) and are only present for wire-format completeness.
  • Text.Delete straddle guard is dead code. A defensive branch inside Delete (for items that straddle the deletion start) is never reached in practice because findPositionAt pre-splits items at the target boundary. It is left in place for safety but not covered by tests. This is the remaining coverage gap after v0.1.1.

Install

go get github.com/CivNode/yjs-go

Requires Go 1.22 or later. The only non-stdlib dependency is nhooyr.io/websocket for the transport package.

Quick start

package main

import (
    "fmt"
    yjs "github.com/CivNode/yjs-go"
)

func main() {
    // Two documents on different peers.
    docA := yjs.NewDoc()
    docB := yjs.NewDoc()

    // Capture updates so we can exchange them.
    var updA, updB []byte
    docA.OnUpdate(func(u []byte, _ interface{}) { updA = u })
    docB.OnUpdate(func(u []byte, _ interface{}) { updB = u })

    textA := docA.GetText("code")
    textB := docB.GetText("code")

    // Each peer writes concurrently.
    docA.Transact(func() { textA.Insert(0, "Hello") }, nil)
    docB.Transact(func() { textB.Insert(0, "World") }, nil)

    // Exchange updates — order doesn't matter.
    yjs.ApplyUpdate(docA, updB, "remote")
    yjs.ApplyUpdate(docB, updA, "remote")

    // Both peers converge to the same string.
    fmt.Println(textA.String()) // e.g. "WorldHello" or "HelloWorld" — deterministic
    fmt.Println(textB.String()) // same as textA
}

Shared types

Text
text := doc.GetText("name")
doc.Transact(func() {
    text.Insert(0, "hello")
    text.Delete(0, 2) // removes "he"
}, nil)
fmt.Println(text.String()) // "llo"
Map
m := doc.GetMap("meta")
doc.Transact(func() {
    m.Set("author", "Alice")
    m.Set("version", int64(3))
}, nil)
v, ok := m.Get("author") // "Alice", true
m.Delete("version")
keys := m.Keys() // ["author"]
Array
a := doc.GetArray("items")
doc.Transact(func() {
    a.Push("one", "two", "three")
}, nil)
a.Insert(1, "one-and-a-half")
a.Delete(0, 1)
fmt.Println(a.ToSlice()) // ["one-and-a-half", "two", "three"]
Awareness

Awareness carries ephemeral per-peer state (cursor positions, user info) that is not persisted.

aw := yjs.NewAwareness(doc)
defer aw.Destroy()

aw.SetLocalState(map[string]interface{}{
    "user": "Alice",
    "cursor": int64(42),
})

aw.OnChange(func(added, updated, removed []uint64, origin interface{}) {
    fmt.Printf("peers changed: +%v ~%v -%v\n", added, updated, removed)
})

// Encode and send to a peer.
encoded := aw.EncodeAwarenessUpdate()
// On the receiving end:
aw2.ApplyUpdate(encoded, "remote")

WebSocket transport

yjs-go/transport implements the y-websocket protocol so Go clients can join rooms hosted by any y-websocket-compatible relay.

import "github.com/CivNode/yjs-go/transport"

doc := yjs.NewDoc()
conn, err := transport.Connect(ctx, doc, "ws://localhost:1234", "my-room", "")
if err != nil {
    log.Fatal(err)
}
defer conn.Close()

// Local mutations are broadcast automatically.
doc.Transact(func() { doc.GetText("code").Insert(0, "hello") }, nil)

// Subscribe to remote updates.
conn.OnUpdate(func(update []byte) {
    fmt.Printf("remote update: %d bytes\n", len(update))
})

The transport.NewRelay() type is an http.Handler you can use in tests or embed in your own server:

relay := transport.NewRelay()
http.Handle("/", relay)
http.ListenAndServe(":1234", nil)

Sync protocol

yjs-go/protocol has helpers for the y-protocols sync message framing:

import "github.com/CivNode/yjs-go/protocol"

// Encode
step1 := protocol.EncodeSyncStep1(stateVector)
step2 := protocol.EncodeSyncStep2(update)
upd   := protocol.EncodeUpdate(update)

// Decode
msg, err := protocol.ReadSyncMessage(r)
// msg.Type: 0=step1, 1=step2, 2=update
// msg.Payload: stateVector or update bytes

Encoding helpers

sv, _  := yjs.EncodeStateVector(doc)             // compact state vector
upd, _ := yjs.EncodeStateAsUpdate(doc, remoteSV) // diff since remoteSV
yjs.ApplyUpdate(doc, upd, "remote")               // integrate an update

// Snapshot the full document state and restore it on a new Doc.
snap, _ := doc.Snapshot()
doc2, _ := yjs.RestoreDoc(snap)

Benchmarks

On an AMD Ryzen 9 3950X (amd64, Go 1.22):

BenchmarkTextInsert1k           870    1.29 ms/op    456 KB/op   10070 allocs/op
BenchmarkTextInsert10k            9  113.6 ms/op    4908 KB/op  100101 allocs/op
BenchmarkTextInsert100k           1  14.35 s/op    54896 KB/op 1000189 allocs/op
BenchmarkTextAppend1k           416    2.87 ms/op   1002 KB/op   25017 allocs/op
BenchmarkMapUpdate1k            798    1.50 ms/op   1153 KB/op   30529 allocs/op
BenchmarkSnapshotSize/100     25305   47.36 µs/op     46 KB/op    1057 allocs/op
BenchmarkSnapshotSize/1000      931    1.38 ms/op    446 KB/op   10073 allocs/op
BenchmarkSnapshotSize/10000       9  116.4 ms/op    4907 KB/op  100104 allocs/op
BenchmarkApplyUpdate           2845  421.4 µs/op     354 KB/op   11048 allocs/op
BenchmarkEncodeStateVector  4612528  259.6 ns/op      160 B/op       5 allocs/op

Run them yourself:

go test -bench=. -benchmem ./...

Interop testing

The transport package includes tests that connect to a real Node.js y-websocket server. They are skipped by default:

cd testdata/interop && npm install
YJS_GO_INTEROP=1 go test ./transport/... -run TestInterop -v

Contributing

Issues and pull requests welcome. Please include tests for any change. Run go test ./... before submitting.

License

MIT. See LICENSE.

Documentation

Overview

Package yjs implements the Yjs CRDT protocol in Go. It provides Doc, Text, Map, Array, and XmlFragment types with binary protocol support (update-v1, sync, awareness) compatible with the JavaScript yjs library and y-websocket relay. update-v2 is not yet implemented; update-v1 is the only supported format for v0.1.x.

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

func ApplyUpdate

func ApplyUpdate(d *Doc, update []byte, origin interface{}) error

ApplyUpdate applies a v1 binary update received from a remote peer.

func EncodeStateAsUpdate

func EncodeStateAsUpdate(d *Doc, encodedStateVector []byte) ([]byte, error)

EncodeStateAsUpdate encodes the full document state as a v1 update, optionally filtered to only items after encodedStateVector.

func EncodeStateVector

func EncodeStateVector(d *Doc) ([]byte, error)

EncodeStateVector returns the binary v1 state vector for document d.

Types

type Array

type Array struct {
	// contains filtered or unexported fields
}

Array is a CRDT ordered sequence of arbitrary values.

func (*Array) Delete

func (a *Array) Delete(index, length uint64)

Delete removes length items starting at index.

func (*Array) Get

func (a *Array) Get(index uint64) (interface{}, bool)

Get returns the value at index, or (nil, false) if out of range.

func (*Array) Insert

func (a *Array) Insert(index uint64, values ...interface{})

Insert inserts values at the given index.

func (*Array) Len

func (a *Array) Len() uint64

Len returns the number of non-deleted elements.

func (*Array) Observe

func (a *Array) Observe(fn func(*ArrayEvent))

Observe registers a handler for changes to this Array.

func (*Array) Push

func (a *Array) Push(values ...interface{})

Push appends values to the end of the array.

func (*Array) ToSlice

func (a *Array) ToSlice() []interface{}

ToSlice returns all non-deleted values as a slice.

type ArrayEvent

type ArrayEvent struct {
	Origin interface{}
}

ArrayEvent carries information about a change to an Array type.

type Awareness

type Awareness struct {
	// contains filtered or unexported fields
}

Awareness implements the Yjs awareness protocol: ephemeral shared state (cursors, user info) keyed by clientID. The binary format is: varuint(count), (varuint(clientID), varuint(clock), varstring(JSON(state)))*.

func NewAwareness

func NewAwareness(doc *Doc) *Awareness

NewAwareness creates an Awareness instance bound to doc.

func (*Awareness) ApplyUpdate

func (a *Awareness) ApplyUpdate(data []byte, origin interface{}) error

ApplyUpdate applies a remote awareness update binary.

func (*Awareness) Destroy

func (a *Awareness) Destroy()

Destroy cleans up the awareness instance (removes local state).

func (*Awareness) EncodeAwarenessUpdate

func (a *Awareness) EncodeAwarenessUpdate() []byte

EncodeAwarenessUpdate encodes all current states (all known clients).

func (*Awareness) EncodeUpdate

func (a *Awareness) EncodeUpdate(clientIDs []uint64) []byte

EncodeUpdate encodes the awareness states for the given client IDs as binary. Binary format: varuint(count), then (varuint(clientID), varuint(clock), varstring(JSON))*.

func (*Awareness) GetLocalState

func (a *Awareness) GetLocalState() map[string]interface{}

GetLocalState returns this client's current state, or nil if removed.

func (*Awareness) GetStates

func (a *Awareness) GetStates() map[uint64]map[string]interface{}

GetStates returns a snapshot of all known client states.

func (*Awareness) OnChange

func (a *Awareness) OnChange(h ChangeHandler)

OnChange registers a handler for awareness state changes.

func (*Awareness) SetLocalState

func (a *Awareness) SetLocalState(state map[string]interface{})

SetLocalState sets this client's local state. Pass nil to remove.

func (*Awareness) SetLocalStateField

func (a *Awareness) SetLocalStateField(field string, value interface{})

SetLocalStateField sets a single field in the local state map.

type ChangeHandler

type ChangeHandler func(added, updated, removed []uint64, origin interface{})

ChangeHandler is called when awareness states change. added/updated/removed are slices of clientIDs that changed.

type Doc

type Doc struct {
	// contains filtered or unexported fields
}

Doc is the root Yjs document. It owns a StructStore, manages transactions, and holds named shared types (Text, Map, Array, XmlFragment).

func NewDoc

func NewDoc() *Doc

NewDoc creates a new document with a random clientID.

ClientIDs are 32-bit unsigned integers (matching Yjs/lib0 behavior) so they remain within JavaScript's safe-integer range for cross-runtime interop.

func NewDocWithClientID

func NewDocWithClientID(clientID uint64) *Doc

NewDocWithClientID creates a document with a specific clientID. Useful for tests.

func RestoreDoc added in v0.1.1

func RestoreDoc(snapshot []byte) (*Doc, error)

RestoreDoc reconstructs a Doc from a snapshot produced by Doc.Snapshot.

func (*Doc) ClientID

func (d *Doc) ClientID() uint64

ClientID returns this document's unique peer identifier.

func (*Doc) GetArray

func (d *Doc) GetArray(name string) *Array

GetArray returns the named Array type, creating it if it does not exist.

func (*Doc) GetMap

func (d *Doc) GetMap(name string) *Map

GetMap returns the named Map type, creating it if it does not exist.

func (*Doc) GetText

func (d *Doc) GetText(name string) *Text

GetText returns the named Text type, creating it if it does not exist.

func (*Doc) GetXmlFragment

func (d *Doc) GetXmlFragment(name string) *XmlFragment

GetXmlFragment returns the named XmlFragment type, creating it if it does not exist. GetXmlFragment returns a shared XmlFragment rooted at the given name.

In v0.1.x the returned fragment supports only Len(); see the XmlFragment godoc. Use GetText, GetMap, or GetArray for functional shared types until XmlFragment mutation lands in v0.2.

func (*Doc) OnUpdate

func (d *Doc) OnUpdate(h UpdateHandler) func()

OnUpdate registers a handler that is called after each transaction with the binary v1 update bytes that should be broadcast to peers.

It returns an unsubscribe function. Call it to remove the handler; subsequent transactions will not invoke it. This prevents handler leaks across connect/disconnect cycles when the same Doc is reused.

func (*Doc) Snapshot added in v0.1.1

func (d *Doc) Snapshot() ([]byte, error)

Snapshot returns an opaque byte slice encoding the full document state. Pass the result to RestoreDoc to reconstruct a Doc with the same state.

func (*Doc) Transact

func (d *Doc) Transact(fn func(), origin interface{})

Transact runs fn inside a transaction. All structural changes must happen inside a transaction. Transactions batch items, then on commit produce the binary update and fire OnUpdate handlers.

type ID

type ID struct {
	Client uint64
	Clock  uint64
}

ID uniquely identifies a struct in the document. Client is the peer's numeric ID; Clock is monotonically increasing per client.

func (ID) String

func (id ID) String() string

type Item

type Item struct {
	ID ID

	// Left and Right are the previous/next live items in insertion order.
	Left  *Item
	Right *Item

	// OriginLeft and OriginRight are the IDs of the items that were to the
	// left and right of this item when it was created (causal context for
	// the LSST conflict resolution algorithm).
	OriginLeft  *ID
	OriginRight *ID

	// Parent is the YType that owns this item.
	Parent interface{} // *Text | *Map | *Array | etc.

	// ParentSub is the map key when this item lives inside a Map.
	ParentSub *string

	// Content: one of string, []interface{}, etc.
	Kind    contentKind
	Content interface{} // string for contentString, interface{} for contentAny, etc.
	Length  uint64

	Deleted bool
	// contains filtered or unexported fields
}

Item is a node in the doubly-linked list that forms a Yjs type's content. Each item corresponds to one Insert operation with its causal context (originLeft, originRight) for LSST-based conflict resolution.

type Map

type Map struct {
	// contains filtered or unexported fields
}

Map is a CRDT key-value store. Values are arbitrary Go values encoded as Yjs ContentAny. Only the most-recent item per key (by clock) is live.

func (*Map) Delete

func (m *Map) Delete(key string)

Delete removes key from the map.

func (*Map) Get

func (m *Map) Get(key string) (interface{}, bool)

Get returns the value for key, or (nil, false) if absent.

func (*Map) Keys

func (m *Map) Keys() []string

Keys returns all non-deleted keys.

func (*Map) Observe

func (m *Map) Observe(fn func(*MapEvent))

Observe registers a handler for changes to this Map.

func (*Map) Set

func (m *Map) Set(key string, value interface{})

Set sets key to value in the map.

type MapEvent

type MapEvent struct {
	// Origin is the transaction origin passed to Transact.
	Origin interface{}
}

MapEvent carries information about a change to a Map type.

In v0.1.x the event carries only Origin. Per-key change details (action, old value, new value) are planned for v0.2.

type SharedType

type SharedType interface {
	// contains filtered or unexported methods
}

SharedType is implemented by Text, Map, Array, XmlFragment.

type StructStore

type StructStore struct {
	// contains filtered or unexported fields
}

StructStore holds all items per client, sorted by clock. This matches Yjs StructStore exactly.

type Text

type Text struct {
	// contains filtered or unexported fields
}

Text is a CRDT text sequence. Internally it is a doubly-linked list of Items, each holding a string fragment.

func (*Text) Delete

func (t *Text) Delete(index, length uint64)

Delete removes length UTF-16 code units starting at index.

func (*Text) Insert

func (t *Text) Insert(index uint64, s string)

Insert inserts s at the given UTF-16 code-unit index (Yjs uses UTF-16 length). In practice for ASCII content, index equals the rune offset.

func (*Text) Len

func (t *Text) Len() uint64

Len returns the number of visible UTF-16 code units.

func (*Text) Observe

func (t *Text) Observe(fn func(*TextEvent))

Observe registers a callback that fires after each transaction that changes this Text.

func (*Text) String

func (t *Text) String() string

String returns the current text content by walking the item list and concatenating non-deleted string items.

type TextEvent

type TextEvent struct {
	// Origin is the transaction origin passed to Transact.
	Origin interface{}
}

TextEvent carries information about a change to a Text type.

In v0.1.x the event carries only Origin. Detailed delta computation (retain/insert/delete operations per the Yjs delta format) is planned for v0.2 once the observer infrastructure matures.

type UpdateHandler

type UpdateHandler func(update []byte, origin interface{})

UpdateHandler is called whenever the local document produces a new update.

type XmlFragment

type XmlFragment struct {
	// contains filtered or unexported fields
}

XmlFragment represents a shared XML tree rooted in the document.

In v0.1.x, XmlFragment is a PLACEHOLDER: only Len() is implemented. Mutations (child insertion, attribute setting, iteration) are planned for v0.2. For current work use Text, Map, or Array.

func (*XmlFragment) Len

func (x *XmlFragment) Len() uint64

Len returns the number of non-deleted child nodes.

Directories

Path Synopsis
Package protocol implements the Yjs binary wire format: LEB128 unsigned varints, variable-length byte arrays, and UTF-8 strings, matching lib0/encoding exactly.
Package protocol implements the Yjs binary wire format: LEB128 unsigned varints, variable-length byte arrays, and UTF-8 strings, matching lib0/encoding exactly.
Package transport — in-process test relay, reusable in Tier 2 integration tests.
Package transport — in-process test relay, reusable in Tier 2 integration tests.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL