rangedb

package module
v0.12.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jul 2, 2021 License: BSD-3-Clause Imports: 7 Imported by: 4

README

Build Status Docker Build Status Go Report Card Test Coverage Maintainability GoDoc Go Version Release Sourcegraph License

An event store database in Go. This package includes a stand-alone database and web server along with a library for embedding event sourced applications.

Examples are provided here.

Docker

docker run -p 8080:8080 inklabs/rangedb

Community

DDD-CQRS-ES slack group:

Projects using RangeDB

Documentation

Index

Constants

View Source
const Version = "0.12.0"

Version for RangeDB.

Variables

View Source
var ErrStreamNotFound = fmt.Errorf("stream not found")

Functions

func GetEventStream

func GetEventStream(message AggregateMessage) string

GetEventStream returns the stream name for an event.

func GetStream

func GetStream(aggregateType, aggregateID string) string

GetStream returns the stream name for an aggregateType and aggregateID.

func NewRecordIterator added in v0.6.0

func NewRecordIterator(recordResult <-chan ResultRecord) *recordIterator

NewRecordIterator constructs a new rangedb.Record iterator

func NewRecordIteratorWithError added in v0.12.0

func NewRecordIteratorWithError(err error) *recordIterator

func ParseStream added in v0.4.0

func ParseStream(streamName string) (aggregateType, aggregateID string)

ParseStream returns the aggregateType and aggregateID for a stream name.

func PublishRecordOrCancel added in v0.6.0

func PublishRecordOrCancel(ctx context.Context, resultRecords chan ResultRecord, record *Record, timeout time.Duration) bool

PublishRecordOrCancel publishes a Record to a ResultRecord channel, or times out.

Types

type AggregateMessage

type AggregateMessage interface {
	AggregateID() string
	AggregateType() string
}

AggregateMessage is the interface that supports building an event stream name.

type Event

type Event interface {
	AggregateMessage
	EventType() string
}

Event is the interface that defines the required event methods.

func NewRawEvent added in v0.5.0

func NewRawEvent(aggregateType, aggregateID, eventType string, data interface{}) Event

NewRawEvent constructs a new raw event when an event struct is unavailable or unknown.

type EventBinder added in v0.2.4

type EventBinder interface {
	Bind(events ...Event)
}

EventBinder defines how to bind events for serialization.

type EventRecord added in v0.5.0

type EventRecord struct {
	Event    Event
	Metadata interface{}
}

EventRecord stores the event and metadata to be used for persisting.

type EventTypeIdentifier added in v0.3.0

type EventTypeIdentifier interface {
	EventTypeLookup(eventTypeName string) (reflect.Type, bool)
}

EventTypeIdentifier is the interface for retrieving an event type.

type Record

type Record struct {
	AggregateType        string      `msgpack:"a" json:"aggregateType"`
	AggregateID          string      `msgpack:"i" json:"aggregateID"`
	GlobalSequenceNumber uint64      `msgpack:"g" json:"globalSequenceNumber"`
	StreamSequenceNumber uint64      `msgpack:"s" json:"streamSequenceNumber"`
	InsertTimestamp      uint64      `msgpack:"u" json:"insertTimestamp"`
	EventID              string      `msgpack:"e" json:"eventID"`
	EventType            string      `msgpack:"t" json:"eventType"`
	Data                 interface{} `msgpack:"d" json:"data"`
	Metadata             interface{} `msgpack:"m" json:"metadata"`
}

Record contains event data and metadata.

func ReadNRecords added in v0.3.0

func ReadNRecords(totalEvents uint64, f func() (RecordIterator, context.CancelFunc)) []*Record

ReadNRecords reads up to N records from the channel returned by f into a slice

type RecordIoStream

type RecordIoStream interface {
	Read(io.Reader) RecordIterator
	Write(io.Writer, RecordIterator) <-chan error
	Bind(events ...Event)
}

RecordIoStream is the interface that (de)serializes a stream of Records.

type RecordIterator added in v0.6.0

type RecordIterator interface {
	Next() bool
	Record() *Record
	Err() error
}

RecordIterator is used to traverse a stream of record events.

func MergeRecordIteratorsInOrder added in v0.6.0

func MergeRecordIteratorsInOrder(recordIterators []RecordIterator) RecordIterator

MergeRecordIteratorsInOrder combines record channels ordered by record.GlobalSequenceNumber.

type RecordSerializer

type RecordSerializer interface {
	Serialize(record *Record) ([]byte, error)
	Deserialize(data []byte) (*Record, error)
	Bind(events ...Event)
}

RecordSerializer is the interface that (de)serializes Records.

type RecordSubscriber

type RecordSubscriber interface {
	Accept(record *Record)
}

RecordSubscriber is the interface that defines how a projection receives Records.

type RecordSubscriberFunc added in v0.3.0

type RecordSubscriberFunc func(*Record)

The RecordSubscriberFunc type is an adapter to allow the use of ordinary functions as record subscribers. If f is a function with the appropriate signature, RecordSubscriberFunc(f) is a Handler that calls f.

func (RecordSubscriberFunc) Accept added in v0.3.0

func (f RecordSubscriberFunc) Accept(record *Record)

Accept receives a record.

type RecordSubscription added in v0.7.0

type RecordSubscription interface {
	// Start returns immediately after subscribing only to new events in a goroutine.
	Start() error

	// StartFrom blocks until all previous events have been processed, then returns after subscribing to new events in a goroutine.
	StartFrom(globalSequenceNumber uint64) error

	// Stop cancels the subscription and stops.
	Stop()
}

RecordSubscription defines how a subscription starts and stops.

type ResultRecord added in v0.6.0

type ResultRecord struct {
	Record *Record
	Err    error
}

ResultRecord combines Record and error as a result struct for event queries.

type Store

type Store interface {
	EventBinder

	// Events returns a RecordIterator containing all events in the store starting with globalSequenceNumber.
	Events(ctx context.Context, globalSequenceNumber uint64) RecordIterator

	// EventsByAggregateTypes returns a RecordIterator containing all events for each aggregateType(s) starting
	// with globalSequenceNumber.
	EventsByAggregateTypes(ctx context.Context, globalSequenceNumber uint64, aggregateTypes ...string) RecordIterator

	// EventsByStream returns a RecordIterator containing all events in the stream starting with streamSequenceNumber.
	EventsByStream(ctx context.Context, streamSequenceNumber uint64, streamName string) RecordIterator

	// OptimisticDeleteStream removes an entire stream. If the expectedStreamSequenceNumber does not match the current
	// stream sequence number, an rangedberror.UnexpectedSequenceNumber error is returned.
	OptimisticDeleteStream(ctx context.Context, expectedStreamSequenceNumber uint64, streamName string) error

	// OptimisticSave persists events to a single stream returning the new StreamSequenceNumber or an error. If
	// the expectedStreamSequenceNumber does not match the current stream sequence number,
	// an rangedberror.UnexpectedSequenceNumber error is returned.
	OptimisticSave(ctx context.Context, expectedStreamSequenceNumber uint64, eventRecords ...*EventRecord) (uint64, error)

	// Save persists events to a single stream returning the new StreamSequenceNumber or an error.
	Save(ctx context.Context, eventRecords ...*EventRecord) (uint64, error)

	AllEventsSubscription(ctx context.Context, bufferSize int, subscriber RecordSubscriber) RecordSubscription
	AggregateTypesSubscription(ctx context.Context, bufferSize int, subscriber RecordSubscriber, aggregateTypes ...string) RecordSubscription
	TotalEventsInStream(ctx context.Context, streamName string) (uint64, error)
}

Store is the interface that stores and retrieves event records.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL