package module
Version: v0.0.0-...-1ed7046 Latest Latest

This package is not in the latest version of its module.

Go to latest
Published: Mar 3, 2018 License: MIT Imports: 11 Imported by: 0


Crudite 🥒🥕🌶

Go Documentation CircleCI

Crudite is a Go library for creating and managing CRDTs1 (specifically CmRDTs) using Kafka as a datastore backend.

What are CRDTs? (tl;dr)

CRDTs are data structures shared among multiple distributed nodes that provide eventual consistency without central coordination.

Why use CRDTs?

CRDTs are useful when you need distributed systems to share a data structure and eventual consistency is acceptable. Lack of central coordination means that writes scale horizontally without a single point of failure. Reads are entirely local to the node.

Supported CRDTs

  • PN Counter (Positive/Negative Counter)
  • LWW Set (Last-Write-Wins Set)

How is Kafka used?

Kafka provides a distributed, replicated log which is utilized as a broadcast mechanism for data structure operations. Data structure IDs are used as the partition key to leveraging Kafka ordering guarantees. All operations for a particular data structure are within a single Kafka partition.

How should Kafka be configured?

You need a single dedicated topic on a Kafka cluster of version 0.8+.

Topic settings:

  • Replication: a good idea but not strictly necessary. Ideal replica count depends upon desired durability and required read throughput.
  • Log cleanup: must be time-based, retention period depends upon operation volume vs available storage. Retention must be greater than SnapshotDuration.
  • Partitions: the ideal number of partitions will depend upon the number of created data structures. There is no point in having more partitions than data structures as the additional partitions will be unused. The ideal number of data structures per partition depends upon operation volume.


package main

import (


func main() {

	// Connect to brokers kafka[1,2] and use topic "crudite" (must already exist)
	ops := crudite.Options{
		Brokers: []string{"", ""},
		Topic:   "crudite",

	tm, _ := crudite.NewTypeManager(ops)

	// Initialize the type manager with any datatypes that already exist in the Kafka log

	// Get all existing CRDTs
	dtm := tm.Contains()
	for n, dt := range dtm {
		fmt.Printf("name: %v; data type: %v\n", n, dt.Type.String())

	// Create a counter. It will update in the background over time as other nodes increment/decrement it.
	c := tm.NewCounter("mycounter")

	// or get an existing counter or return error if it doesn't exist
	// c, err := tm.GetCounter("mycounter")

	// Increment it

	// Decrement it

	// Get the current value.
	val := c.Value()
	fmt.Printf("counter: %v", val)

	// Create a set
	s := tm.NewSet("myset")

	// Add an element

	// Check if an element exists
	if s.Contains([]byte("1234")) {
		fmt.Println("1234 exists in set")

	// Get all elements
	elements := s.Elements()
	for _, e := range elements {
		fmt.Printf("element: %v\n", e)

	// Shut down




View Source
const (
	DefaultSnapshotInterval = 10 * time.Minute
	DefaultOutputQueueSize  = 100 // number of outgoing log messages to queue before sending blocks

Options defaults


View Source
var ErrInitAborted = errors.New("init was aborted")

ErrInitAborted is returned when Init is aborted via context

View Source
var ErrNoSuchCounter = errors.New("no such counter found")

ErrNoSuchCounter is returned when a named counter doesn't exist

View Source
var ErrNoSuchSet = errors.New("no such set found")

ErrNoSuchSet is returned when a named set doesn't exist

View Source
var ErrSendWouldHaveBlocked = errors.New("write to output channel would have blocked")

ErrSendWouldHaveBlocked is an error indicating that the output queue was full


This section is empty.


type Counter

type Counter struct {
	Name string
	// contains filtered or unexported fields

Counter implements a PN Counter CRDT

func (*Counter) Increment

func (c *Counter) Increment(n int) error

Increment increments the counter by n. Pass a negative n to decrement.

func (*Counter) Value

func (c *Counter) Value() int

Value returns the current value of the counter

type DataStructure

type DataStructure struct {
	Name string
	Type DataType

DataStructure contains information about an extant data stucture

type DataType

type DataType int

DataType desribes the type of CRDT

const (
	PNCounter DataType = iota

Supported abstract CRDT types

func (DataType) String

func (i DataType) String() string

type LogFunc

type LogFunc func(string, ...interface{})

LogFunc is a function that logs a formatted string somewhere

type Options

type Options struct {
	Brokers            []string
	Topic              string
	OutputQueueSize    uint
	SnapshotInterval   time.Duration
	LogFunction        LogFunc
	FailOnBlockingSend bool // If output queue is full, fail instead of blocking on send

Options is the configuration for a TypeManager

type Set

type Set struct {
	Name string
	// contains filtered or unexported fields

Set implements a LWW set (removal-biased)

func (*Set) Add

func (s *Set) Add(elem []byte) error

Add adds elem to the set

func (*Set) Contains

func (s *Set) Contains(elem []byte) bool

Contains indicates whether elem exists in the set

func (*Set) Elements

func (s *Set) Elements() [][]byte

Elements returns all the elements in the set in no particular order

func (*Set) Remove

func (s *Set) Remove(elem []byte) error

Remove removes elem from the set

type TypeManager

type TypeManager struct {
	// contains filtered or unexported fields

TypeManager is an object that manages CRDT types

func NewTypeManager

func NewTypeManager(ops Options) (*TypeManager, error)

NewTypeManager returns a TypeManager using the options provided

func (*TypeManager) Contains

func (tm *TypeManager) Contains() map[string]DataStructure

Contains returns all known data structures as a map of name to DataStructure

func (*TypeManager) GetCounter

func (tm *TypeManager) GetCounter(name string) (*Counter, error)

GetCounter returns a counter that already exists in the log

func (*TypeManager) GetSet

func (tm *TypeManager) GetSet(name string) (*Set, error)

GetSet returns a set that already exists in the log

func (*TypeManager) Init

func (tm *TypeManager) Init(ctx context.Context) error

Init synchronously reads through the event logs and populates TypeManager with any existing data structures and values. If the event log is sizeable, this could block for a substantial period of time.

func (*TypeManager) NewCounter

func (tm *TypeManager) NewCounter(name string) *Counter

NewCounter creates and returns a new Counter if it doesn't exist, or the existing counter with that name.

func (*TypeManager) NewSet

func (tm *TypeManager) NewSet(name string) *Set

NewSet creates and returns a new Set if it doesn't exist, or the existing set with that name.

func (*TypeManager) Stop

func (tm *TypeManager) Stop()

Stop shuts down the TypeManager


Path Synopsis

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
t or T : Toggle theme light dark auto
y or Y : Canonical URL