streams

package
v0.0.0-...-917641f Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Sep 30, 2019 License: MIT Imports: 3 Imported by: 3

Documentation

Overview

Package streams defines convergent streams of changes

A stream is like an event emitter or source: it tracks a sequence of changes on a value. It is an immutable value that logically maps to a "Git commit". Appending a change to an event is equivalent to creating a new commit based on the previous stream.

Streams differ from event emitters or immutable values in a fundamental way: they are convergent. All streams from the same family converge to the same value

For example, consider two changes on the same initial stream value.

s := ...stream...
s1 := s.Append(change1)
s2 := s.Append(change2)

The two output streams converge in the following sense:

s1Next, c1Next := s1.Next()
s2Next, c2Next := s2.Next()
initial.Apply(nil, c1).Apply(nil, c1Next) == initial.Apply(nil, c2).Apply(nil, c2Next)

Basically, just chasing the sequence of changes from a particular stream instance is guaranteed to end with the same value as any other stream in that family.

A "family" is any stream derived from another in by means of any number of "Append" calls.

Branching

Streams support Git-like branching with local changes not automatically appearing on the parent until a call to Push.

Substream

It is possible to create sub-streams for elements rooted below the current element. For example, one can take a stream of elements and only focus on the substream of changes to the 5th element. In this case, if the parent stream has a change which splices in a few elements before 5, the sub-stream should correspondingly refer to the new indices. And any changes on the sub-stream should refer to the correct index on the parent. The Substream() method provides the implementation of this concept.

Value Streams

Streams inherently only track the actual changes and not the underlying values but most applications also need to track the current value. See Int, Bool, S16 or S8 for an example stream that tracks an underlying value backed by a Stream.

Custom stream implementations

The dotc package (https://godoc.org/github.com/dotchain/dot/x/dotc) defines a mechanism to automatically generate the Stream related types for structs, slices and unions.

Example (NewStream)
package main

import (
	"fmt"

	"github.com/dotchain/dot/changes"
	"github.com/dotchain/dot/changes/types"
	"github.com/dotchain/dot/streams"
)

func main() {
	s := streams.New()
	s.Append(changes.Splice{
		Offset: 0,
		Before: types.S8(""),
		After:  types.S8("OK "),
	})

	_, c := streams.Latest(s)
	fmt.Println("Changed:", types.S8("Hello World").Apply(nil, c))

}
Output:

Changed: OK Hello World
Example (StreamBranching)
package main

import (
	"fmt"

	"github.com/dotchain/dot/changes"
	"github.com/dotchain/dot/changes/types"
	"github.com/dotchain/dot/streams"
)

func main() {
	val := changes.Value(types.S8("Hello World"))
	s := streams.New()
	child := streams.Branch(s)

	// update child, the changes won't be reflected on latest
	child.Append(changes.Splice{
		Offset: 0,
		Before: types.S8(""),
		After:  types.S8("OK "),
	})

	_, c := streams.Latest(s)
	fmt.Println("Latest:", val.Apply(nil, c))

	// merge child and parent, change will get reflected
	if err := child.Push(); err != nil {
		fmt.Println("Error", err)
	}

	_, c = streams.Latest(s)
	fmt.Println("Latest:", val.Apply(nil, c))

}
Output:

Latest: Hello World
Latest: OK Hello World
Example (StreamMerge)
package main

import (
	"fmt"

	"github.com/dotchain/dot/changes"
	"github.com/dotchain/dot/changes/types"
	"github.com/dotchain/dot/streams"
)

func main() {
	s := streams.New()
	s1 := s.Append(changes.Splice{
		Offset: 0,
		Before: types.S8(""),
		After:  types.S8("OK "),
	})

	_, c := streams.Latest(s)
	fmt.Println("Changed:", types.S8("Hello World").Apply(nil, c))

	// note that this works on s, so the offset location is based
	// off "Hello World", rather than "OK Hello World"
	_ = s.Append(changes.Splice{
		Offset: len("Hello World"),
		Before: types.S8(""),
		After:  types.S8("!"),
	})

	_, c = streams.Latest(s)
	fmt.Println("Changed:", types.S8("Hello World").Apply(nil, c))

	// now modify s1 again which is based off of "OK Hello World"
	s1.Append(changes.Splice{
		Offset: len("OK Hello World"),
		Before: types.S8(""),
		After:  types.S8("*"),
	})

	_, c = streams.Latest(s)
	fmt.Println("Changed:", types.S8("Hello World").Apply(nil, c))

}
Output:

Changed: OK Hello World
Changed: OK Hello World!
Changed: OK Hello World!*

Index

Examples

Constants

This section is empty.

Variables

This section is empty.

Functions

This section is empty.

Types

type Bool

type Bool struct {
	Stream Stream
	Value  bool
}

Bool implements a bool stream.

func (*Bool) Latest

func (s *Bool) Latest() *Bool

Latest returns the latest non-nil entry in the stream

func (*Bool) Next

func (s *Bool) Next() (*Bool, changes.Change)

Next returns the next if there is one.

func (*Bool) Update

func (s *Bool) Update(val bool) *Bool

Update replaces the current value with the new value

type Counter

type Counter struct {
	Stream Stream
	Value  int32
}

Counter implements a counter stream.

func (*Counter) Increment

func (c *Counter) Increment(by int32) *Counter

Increment by specified amount

func (*Counter) Latest

func (c *Counter) Latest() *Counter

Latest returns the latest non-nil entry in the stream

func (*Counter) Next

func (c *Counter) Next() (*Counter, changes.Change)

Next returns the next if there is one.

func (*Counter) Update

func (c *Counter) Update(val int32) *Counter

Update replaces the current value with the new value

type Int

type Int struct {
	Stream Stream
	Value  int
}

Int implements an int stream.

func (*Int) Latest

func (s *Int) Latest() *Int

Latest returns the latest non-nil entry in the stream

func (*Int) Next

func (s *Int) Next() (*Int, changes.Change)

Next returns the next if there is one.

func (*Int) Update

func (s *Int) Update(val int) *Int

Update replaces the current value with the new value

type S16

type S16 struct {
	Stream Stream
	Value  string
}

S16 implements an UFT16 string stream

func (*S16) Latest

func (s *S16) Latest() *S16

Latest returns the latest non-nil entry in the stream

func (*S16) Move

func (s *S16) Move(offset, count, distance int) *S16

Move moves[offset:offset+count] by the provided distance to the right (or if distance is negative, to the left)

func (*S16) Next

func (s *S16) Next() (*S16, changes.Change)

Next returns the next if there is one.

func (*S16) Splice

func (s *S16) Splice(offset, count int, insert string) *S16

Splice replaces s[offset:offset+count] with the provided insert string value.

func (*S16) Update

func (s *S16) Update(val string) *S16

Update replaces the current value with the new value

type S8

type S8 struct {
	Stream Stream
	Value  string
}

S8 implements an UFT8 string stream

func (*S8) Latest

func (s *S8) Latest() *S8

Latest returns the latest non-nil entry in the stream

func (*S8) Move

func (s *S8) Move(offset, count, distance int) *S8

Move moves[offset:offset+count] by the provided distance to the right (or if distance is negative, to the left)

func (*S8) Next

func (s *S8) Next() (*S8, changes.Change)

Next returns the next if there is one.

func (*S8) Splice

func (s *S8) Splice(offset, count int, insert string) *S8

Splice replaces s[offset:offset+count] with the provided insert string value.

func (*S8) Update

func (s *S8) Update(val string) *S8

Update replaces the current value with the new value

type Stream

type Stream interface {
	// Append adds a change on top of the current change.  If
	// the current change has a Next, this is merged with the next
	// and applied to the Next instead.  That way, the change is
	// propagated all the way and applied at the end of the
	// stream.
	//
	// A listener on the stream can expect to get a change that is
	// safe to apply on top of the last change emitted.
	Append(changes.Change) Stream

	// ReverseAppend is just like Append except ReverseMerge is
	// used instead of Merge.  ReverseAppend is used to when a
	// remote change is being appended -- with the newly appended
	// change actually taking precedence over all other changes
	// that have been applied on top of the current instance.
	ReverseAppend(changes.Change) Stream

	// Next returns the change and the next stream. If no further
	// changes exist, it returns nil for both. All related stream
	// instances are guaranateed to converge -- i.e. irrespective
	// of which instance one holds, iterating over all the Next
	// values and applying them will get them all to converge  to
	// the same value.
	Next() (Stream, changes.Change)

	// Push pushes all local change up to any remote stream.
	// Does nothing if not connected to a remote stream
	Push() error

	// Pull pulls all changes from a remote stream.
	// Does nothing if not connected to a remote stream
	Pull() error

	// Undo undoes the last change on this branch
	// Does nothing if not connected to a undo stack
	Undo()

	// Redo redoes the last change on this branch.
	// Does nothing if not connected to a undo stack
	Redo()
}

Stream is an immutable type to track a sequence of changes.

A change can be "applied" to a stream instance via the Append method. This results in a new stream instance. The old and the new stream instances can both be used for further changes but they represent different states: a change applied on an earlier version of the stream will be transformed onto the latest when it is actually applied.

Logically, every stream is a change made on top of another and so forms a tree. But each stream instance is careful to not store any references to previous changes as this would cause the memory to constantly grow. Instead, each stream instance maintains a forward list -- a list of changes that will effectively get it to the same converged state as any other related stream instance.

This list can be traversed via the Next() method.

Branching

All changes made on a stream are propagated to the source. It is possible to create git-like branches using the Branch type, where the changes are cached until an explicit call to Pull or Push to move the changes between two branches.

func Branch

func Branch(upstream Stream) Stream

Branch returns a new stream based on the provided stream. All changes made on the branch are only merged upstream when Push is called explicitly and all changes made upstream are only brought into the local branch when Pull is called explicitly.

func Latest

func Latest(s Stream) (Stream, changes.Change)

Latest returns the latest stream instance and the set of changes that have taken place until then

func New

func New() Stream

New returns a new Stream

func Substream

func Substream(parent Stream, key ...interface{}) Stream

Substream creates a child stream

The sequence of keys are used as paths into the logical value and work for both array indices and object-like keys.

For instance: `streams.Substream(parent, 5, "count")` refers to the "count" field of the 5th element and any changes to it.

Note that the path provided will be kept up-to-date. In the previous example, if 10 items were inserted into the root at index 0, the path would be internally updated to [15, "count"] at that point. This guarantees that any updates to the substream get reflected at the right index of the parent stream.

Substreams may "terminate" if a parent or some higher node is simply deleted. Note that deleting the element referred to by the path itself does not cause the stream to dry up -- some element higher up needs to be replaced. Dried up streams do not hold references to anything and so will not cause garbage collection issues. The only operation that such streams still permit would be the deletion of callbacks.

func Transform

func Transform(parent Stream, append, next func(changes.Change) changes.Change) Stream

Transform creates a new stream where changes are modified before being appended or before being fetched.

Both next and append are optional.

Note that nil is allowed both as argument and return for next and append.

Directories

Path Synopsis

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL