crdt

package
v0.3.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jan 13, 2021 License: Apache-2.0 Imports: 16 Imported by: 0

Documentation

Overview

Package crdt implements the Cloudstate CRDT state model support.

Index

Constants

This section is empty.

Variables

View Source
var ErrCtxFailCalled = errors.New("context failed")
View Source
var ErrStateChanged = errors.New("CRDT change not allowed")

Functions

This section is empty.

Types

type CRDT

type CRDT interface {
	Delta() *entity.CrdtDelta
	HasDelta() bool
	// contains filtered or unexported methods
}

type CancelFunc

type CancelFunc func(c *CommandContext) error

type ChangeFunc

type ChangeFunc func(c *CommandContext) (*any.Any, error)

type Clock

type Clock uint64
const (
	// The Default clock, uses the current system time as the clock value.
	Default Clock = iota

	// A Reverse clock, based on the system clock. Using this effectively
	// achieves First-Write-Wins semantics. This is susceptible to the
	// same clock skew problems as the default clock.
	Reverse

	// A custom clock.
	// The custom clock value is passed by using the customClockValue parameter on
	// the `SetWithClock` method. The value should be a domain specific monotonically
	// increasing value. For example, if the source of the value for this register
	// is a single device, that device may attach a sequence number to each update,
	// that sequence number can be used to guarantee that the register will converge
	// to the last update emitted by that device.
	Custom

	// CustomAutoIncrement is a custom clock, that automatically increments the
	// custom value if the local clock value is greater than it.
	//
	// This is like `Custom`, however if when performing the update in the proxy,
	// it's found that the clock value of the register is greater than the specified
	// clock value for the update, the proxy will instead use the current clock
	// value of the register plus one.
	//
	// This can guarantee that updates done on the same node will be causally
	// ordered (addressing problems caused by the system clock being adjusted),
	// but will not guarantee causal ordering for updates on different nodes,
	// since it's possible that an update on a different node has not yet been
	// replicated to this node.
	CustomAutoIncrement
)

type CommandContext

type CommandContext struct {
	*Context
	CommandID CommandID
	// contains filtered or unexported fields
}

A CommandContext carries change and cancel function handlers and other values to handle a command over different phases of a commands lifecycle.

func (*CommandContext) CancelFunc

func (c *CommandContext) CancelFunc(f CancelFunc)

CancelFunc registers an on cancel handler for this command. The registered function will be invoked if the client initiates a stream cancel. It will not be invoked if the entity cancels the stream itself. The CancelFunc may update the CRDT, and may emit side effects.

func (*CommandContext) ChangeFunc

func (c *CommandContext) ChangeFunc(f ChangeFunc)

ChangeFunc sets the function to be called whenever the CRDT is changed. For non-streamed contexts this is a `no operation`.

func (*CommandContext) Command

func (c *CommandContext) Command() *protocol.Command

Command returns the protobuf message the context is handling as a command.

func (*CommandContext) EndStream

func (c *CommandContext) EndStream()

EndStream marks a command stream to be ended.

func (*CommandContext) Forward

func (c *CommandContext) Forward(forward *protocol.Forward)

Forward forwards this command to another service. The protocol.Forward provided has to ensure it references a valid service and command.

func (*CommandContext) SideEffect

func (c *CommandContext) SideEffect(effect *protocol.SideEffect)

SideEffect adds a side effect to being emitted after the current command successfully has completed.

func (*CommandContext) Streamed

func (c *CommandContext) Streamed() bool

Streamed returns whether the command handled by the context is streamed.

func (*CommandContext) WriteConsistency added in v0.3.0

func (c *CommandContext) WriteConsistency(wc entity.CrdtWriteConsistency)

type CommandID

type CommandID int64

func (CommandID) Value

func (id CommandID) Value() int64

type Context

type Context struct {
	// EntityID is the ID of the entity.
	EntityID EntityID
	// Entity describes the instance that is used as an entity.
	Entity *Entity
	// Instance is the instance of the entity this context is for.
	Instance EntityHandler
	// contains filtered or unexported fields
}

Context holds the context of a running entity.

func (*Context) CRDT

func (c *Context) CRDT() CRDT

TODO: do we really need that?

func (*Context) Delete

func (c *Context) Delete()

Delete marks the CRDT to be deleted initiated by the user function.

func (*Context) StreamCtx

func (c *Context) StreamCtx() context.Context

StreamCtx returns the context.Context from the transport stream this context is assigned to.

type Entity

type Entity struct {
	// ServiceName is the fully qualified name of the service that implements
	// this entities interface. Setting it is mandatory.
	ServiceName ServiceName
	// EntityFunc creates a new entity.
	EntityFunc          func(id EntityID) EntityHandler
	PassivationStrategy protocol.EntityPassivationStrategy
}

Entity captures an Entity with its ServiceName. It is used to be registered as an CRDT entity on a Cloudstate instance.

func (*Entity) Options added in v0.3.0

func (e *Entity) Options(options ...Option)

type EntityHandler

type EntityHandler interface {
	HandleCommand(ctx *CommandContext, name string, msg proto.Message) (*any.Any, error)
	Default(ctx *Context) (CRDT, error)
	Set(ctx *Context, state CRDT) error
}

EntityHandler has to be implemented by any type that wants to get registered as a crdt.Entity tag::entity-handler[]

type EntityID

type EntityID string

func (EntityID) String

func (i EntityID) String() string

type Flag

type Flag struct {
	// contains filtered or unexported fields
}

A Flag is a boolean value that starts as false, and can be set to true. Once set to true, it cannot be set back to false. A flag is a very simple CRDT, the merge function is simply a boolean or over the two flag values being merged.

func NewFlag

func NewFlag() *Flag

func (Flag) Delta

func (f Flag) Delta() *entity.CrdtDelta

func (*Flag) Enable

func (f *Flag) Enable()

Enables enables this flag. Once enabled, it can't be disabled.

func (*Flag) HasDelta

func (f *Flag) HasDelta() bool

func (Flag) Value

func (f Flag) Value() bool

type GCounter

type GCounter struct {
	// contains filtered or unexported fields
}

GCounter, or Grow-only Counter, is a counter that can only be incremented. It works by tracking a separate counter value for each node, and taking the sum of the values for all the nodes to get the current counter value. Since each node only updates its own counter value, each node can coordinate those updates to ensure they are consistent. Then the merge function, if it sees two different values for the same node, simply takes the highest value, because that has to be the most recent value that the node published.

func NewGCounter

func NewGCounter() *GCounter

func (*GCounter) Delta

func (c *GCounter) Delta() *entity.CrdtDelta

func (GCounter) HasDelta

func (c GCounter) HasDelta() bool

func (*GCounter) Increment

func (c *GCounter) Increment(i uint64)

func (*GCounter) Value

func (c *GCounter) Value() uint64

type GSet

type GSet struct {
	// contains filtered or unexported fields
}

GSet, or Grow-only Set, is a set that can only have items added to it. A GSet is a very simple CRDT, its merge function is defined by taking the union of the two GSets being merged.

func NewGSet

func NewGSet() *GSet

func (*GSet) Add

func (s *GSet) Add(a *any.Any)

func (GSet) Added

func (s GSet) Added() []*any.Any

func (GSet) Delta

func (s GSet) Delta() *entity.CrdtDelta

func (GSet) HasDelta

func (s GSet) HasDelta() bool

func (GSet) Size

func (s GSet) Size() int

func (GSet) Value

func (s GSet) Value() []*any.Any

type LWWRegister

type LWWRegister struct {
	// contains filtered or unexported fields
}

LWWRegister, or Last-Write-Wins Register, is a CRDT that can hold any value, along with a clock value and node id to indicate when it was updated by which node. If two nodes have two different versions of the value, the one with the highest clock value wins. If the clock values are equal, then a stable function on the nodes is used to determine it (eg, the node with the lowest address). Note that LWWRegisters do not support partial updates of their values. If the register holds a person object, and one node updates the age property, while another concurrently updates the name property, only one of those updates will eventually win. By default, LWWRegister’s are vulnerable to clock skew between nodes. Cloudstate supports optionally providing a custom clock value should a more trustworthy ordering for updates be available.

func NewLWWRegister

func NewLWWRegister(x *any.Any) *LWWRegister

func NewLWWRegisterWithClock

func NewLWWRegisterWithClock(x *any.Any, c Clock, customClockValue int64) *LWWRegister

NewLWWRegisterWithClock uses the custom clock value if the clock selected is a custom clock. This is ignored if the clock is not a custom clock.

func (*LWWRegister) Delta

func (r *LWWRegister) Delta() *entity.CrdtDelta

func (*LWWRegister) HasDelta

func (r *LWWRegister) HasDelta() bool

func (*LWWRegister) Set

func (r *LWWRegister) Set(x *any.Any)

func (*LWWRegister) SetWithClock

func (r *LWWRegister) SetWithClock(x *any.Any, c Clock, customClockValue int64)

SetWithClock uses the custom clock value to use if the clock selected is a custom clock. This is ignored if the clock is not a custom clock.

func (*LWWRegister) Value

func (r *LWWRegister) Value() *any.Any

type ORMap

type ORMap struct {
	// contains filtered or unexported fields
}

ORMap, or Observed-Removed Map, is similar to an ORSet, with the addition that the values of the set serve as keys for a map, and the values of the map are themselves, CRDTs. When a value for the same key in an ORMap is modified concurrently on two different nodes, the values from the two nodes are merged together.

func NewORMap

func NewORMap() *ORMap

func (*ORMap) Clear

func (m *ORMap) Clear()

func (*ORMap) Delete

func (m *ORMap) Delete(key *any.Any)

func (*ORMap) Delta

func (m *ORMap) Delta() *entity.CrdtDelta

func (*ORMap) Entries added in v0.3.0

func (m *ORMap) Entries() []*ORMapEntry

func (*ORMap) Flag

func (m *ORMap) Flag(key *any.Any) (*Flag, error)

func (*ORMap) GCounter

func (m *ORMap) GCounter(key *any.Any) (*GCounter, error)

func (*ORMap) GSet

func (m *ORMap) GSet(key *any.Any) (*GSet, error)

func (*ORMap) Get

func (m *ORMap) Get(key *any.Any) CRDT

func (*ORMap) HasDelta

func (m *ORMap) HasDelta() bool

func (*ORMap) HasKey

func (m *ORMap) HasKey(x *any.Any) (hasKey bool)

func (*ORMap) Keys

func (m *ORMap) Keys() []*any.Any

func (*ORMap) LWWRegister

func (m *ORMap) LWWRegister(key *any.Any) (*LWWRegister, error)

func (*ORMap) ORMap

func (m *ORMap) ORMap(key *any.Any) (*ORMap, error)

func (*ORMap) ORSet

func (m *ORMap) ORSet(key *any.Any) (*ORSet, error)

func (*ORMap) PNCounter

func (m *ORMap) PNCounter(key *any.Any) (*PNCounter, error)

func (*ORMap) Set

func (m *ORMap) Set(key *any.Any, value CRDT)

func (*ORMap) Size

func (m *ORMap) Size() int

func (*ORMap) Vote

func (m *ORMap) Vote(key *any.Any) (*Vote, error)

type ORMapEntry added in v0.3.0

type ORMapEntry struct {
	Key   *any.Any
	Value CRDT
}

type ORSet

type ORSet struct {
	// contains filtered or unexported fields
}

ORSet, or Observed-Removed Set, is a set that can have items both added and removed from it. It is implemented by maintaining a set of unique tags for each element which are generated on addition into the set. When an element is removed, all the tags that that node currently observes are added to the removal set, so as long as there haven’t been any new additions that the node hasn’t seen when it removed the element, the element will be removed.

func NewORSet

func NewORSet() *ORSet

func (*ORSet) Add

func (s *ORSet) Add(a *any.Any)

func (ORSet) Added

func (s ORSet) Added() []*any.Any

func (*ORSet) Clear

func (s *ORSet) Clear()

func (*ORSet) Delta

func (s *ORSet) Delta() *entity.CrdtDelta

func (*ORSet) HasDelta

func (s *ORSet) HasDelta() bool

func (*ORSet) Remove

func (s *ORSet) Remove(a *any.Any)

func (ORSet) Removed

func (s ORSet) Removed() []*any.Any

func (*ORSet) Size

func (s *ORSet) Size() int

func (ORSet) Value

func (s ORSet) Value() []*any.Any

type Option added in v0.3.0

type Option func(s *Entity)

func WithPassivationStrategyTimeout added in v0.3.0

func WithPassivationStrategyTimeout(duration time.Duration) Option

type PNCounter

type PNCounter struct {
	// contains filtered or unexported fields
}

PNCounter, or Positive-Negative Counter, is a counter that can both be incremented and decremented. It works by combining two GCounters, a positive one, that tracks increments, and a negative one, that tracks decrements. The final counter value is computed by subtracting the negative GCounter from the positive GCounter.

func NewPNCounter

func NewPNCounter() *PNCounter

func (*PNCounter) Decrement

func (c *PNCounter) Decrement(d int64)

func (*PNCounter) Delta

func (c *PNCounter) Delta() *entity.CrdtDelta

func (*PNCounter) HasDelta

func (c *PNCounter) HasDelta() bool

func (*PNCounter) Increment

func (c *PNCounter) Increment(i int64)

func (*PNCounter) Value

func (c *PNCounter) Value() int64

type Server

type Server struct {
	entity.UnimplementedCrdtServer
	// contains filtered or unexported fields
}

Server is the implementation of the Server server API for the CRDT service.

func NewServer

func NewServer() *Server

NewServer returns an initialized Server.

func (*Server) Handle

func (s *Server) Handle(stream entity.Crdt_HandleServer) error

After invoking handle, the first message sent will always be a CrdtInit message, containing the entity ID, and, if it exists or is available, the current value of the entity. After that, one or more commands may be sent, as well as deltas as they arrive, and the entire value if either the entity is created, or the proxy wishes the user function to replace its entire value. The user function must respond with one reply per command in. They do not necessarily have to be sent in the same order that the commands were sent, the command ID is used to correlate commands to replies.

func (*Server) Register

func (s *Server) Register(e *Entity) error

CrdtEntities can be registered to a server that handles crdt entities by a ServiceName. Whenever a internalCRDT.Server receives an CrdInit for an instance of a crdt entity identified by its EntityID and a ServiceName, the internalCRDT.Server handles such entities through their lifecycle. The handled entities value are captured by a context that is held fo each of them.

type ServiceName

type ServiceName string

func (ServiceName) String

func (sn ServiceName) String() string

type Vote

type Vote struct {
	// contains filtered or unexported fields
}

A Vote is a CRDT which allows nodes to vote on a condition. It’s similar to a GCounter, each node has its own counter, and an odd value is considered a vote for the condition, while an even value is considered a vote against. The result of the vote is decided by taking the votes of all nodes that are currently members of the cluster (when a node leave, its vote is discarded). Multiple decision strategies can be used to decide the result of the vote, such as at least one, majority and all.

func NewVote

func NewVote() *Vote

func (*Vote) All

func (v *Vote) All() bool

All returns true if the number of votes for equals the number of voters.

func (*Vote) AtLeastOne

func (v *Vote) AtLeastOne() bool

AtLeastOne returns true if there is at least one voter for the condition.

func (*Vote) Delta

func (v *Vote) Delta() *entity.CrdtDelta

func (*Vote) HasDelta

func (v *Vote) HasDelta() bool

func (*Vote) Majority

func (v *Vote) Majority() bool

Majority returns true if the number of votes for is more than half the number of voters.

func (*Vote) SelfVote

func (v *Vote) SelfVote() bool

SelfVote is the vote of the current node, which is included in Voters and VotesFor.

func (*Vote) Vote

func (v *Vote) Vote(vote bool)

Vote votes with the given boolean for a condition.

func (*Vote) Voters

func (v *Vote) Voters() uint32

Voters is the total number of voters.

func (*Vote) VotesFor

func (v *Vote) VotesFor() uint32

VotesFor is the number of votes for.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL