fes

package
v0.0.0-...-98ba599 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jul 2, 2020 License: Apache-2.0 Imports: 12 Imported by: 0

Documentation

Overview

Package fes is a generated protocol buffer package.

It is generated from these files:

pkg/fes/fes.proto

It has these top-level messages:

Aggregate
Event
EventHints

Index

Constants

View Source
const (
	PubSubLabelEventID        = "event.id"
	PubSubLabelEventType      = "event.type"
	PubSubLabelAggregateType  = "aggregate.type"
	PubSubLabelAggregateID    = "aggregate.id"
	DefaultNotificationBuffer = 64
)

Variables

View Source
var (
	ErrInvalidAggregate       = EventStoreErr{S: "invalid aggregate"}
	ErrInvalidEvent           = EventStoreErr{S: "invalid event"}
	ErrInvalidEntity          = EventStoreErr{S: "invalid entity"}
	ErrEventStoreOverflow     = EventStoreErr{S: "event store out of space"}
	ErrUnsupportedEntityEvent = EventStoreErr{S: "event not supported"}
	ErrCorruptedEventPayload  = EventStoreErr{S: "failed to parse event payload"}
	ErrEntityNotFound         = EventStoreErr{S: "entity not found"}
)

Functions

func ExtractTracingFromEventMetadata

func ExtractTracingFromEventMetadata(metadata map[string]string) (opentracing.SpanContext, error)

func ParseEventData

func ParseEventData(event *Event) (proto.Message, error)

ParseEventData parses the payload of the event, returning the generic proto.Message payload.

In case it fails to parse the payload it returns an ErrCorruptedEventPayload

func ValidateAggregate

func ValidateAggregate(aggregate *Aggregate) error

func ValidateEntity

func ValidateEntity(entity Entity) error

func ValidateEvent

func ValidateEvent(event *Event) error

ValidateEvent validates the event struct.

Note: - It does not parse or check the event data - It does not check the event ID, since events that have not been persisted do not have an ID assigned yet.

Types

type Aggregate

type Aggregate struct {
	Id   string `protobuf:"bytes,1,opt,name=id" json:"id,omitempty"`
	Type string `protobuf:"bytes,2,opt,name=type" json:"type,omitempty"`
}

func GetAggregate

func GetAggregate(v Entity) Aggregate

func (*Aggregate) Descriptor

func (*Aggregate) Descriptor() ([]byte, []int)

func (*Aggregate) Format

func (m *Aggregate) Format() string

func (*Aggregate) GetId

func (m *Aggregate) GetId() string

func (*Aggregate) GetType

func (m *Aggregate) GetType() string

func (*Aggregate) ProtoMessage

func (*Aggregate) ProtoMessage()

func (*Aggregate) Reset

func (m *Aggregate) Reset()

func (*Aggregate) String

func (m *Aggregate) String() string

type AggregateMatcher

type AggregateMatcher func(Aggregate) bool

type Backend

type Backend interface {
	EventAppender

	// Get fetches all events that belong to a specific aggregate
	Get(aggregate Aggregate) ([]*Event, error)
	List(matcher AggregateMatcher) ([]Aggregate, error)
}

Backend is a persistent store for events

type CacheReader

type CacheReader interface {
	//Get(entity Entity) error
	List() []Aggregate

	// GetAggregate retrieves an entity by its (aggregate) key, guaranteeing that either the entity or error is non-nil.
	//
	// If the aggregate was invalid, a fes.ErrInvalidAggregate error is returned.
	// If no entity for the aggregate exists, fes.ErrNotFound error is returned
	GetAggregate(a Aggregate) (Entity, error)
}

type CacheReaderWriter

type CacheReaderWriter interface {
	CacheRefresher
	CacheReader
	CacheWriter
}

type CacheRefresher

type CacheRefresher interface {
	Refresh(key Aggregate)
}

type CacheWriter

type CacheWriter interface {
	Put(entity Entity) error
	Invalidate(entity Aggregate)
}

type CustomType

type CustomType interface {
	Type() string
}

type Entity

type Entity interface {
	// Aggregate provides type information about the entity, such as the aggregate id and the aggregate type.
	//
	// This is implemented by BaseEntity
	ID() string
}

Entity is a entity that can be updated

type Event

type Event struct {
	Id        string                     `protobuf:"bytes,1,opt,name=id" json:"id,omitempty"`
	Type      string                     `protobuf:"bytes,2,opt,name=type" json:"type,omitempty"`
	Aggregate *Aggregate                 `protobuf:"bytes,3,opt,name=aggregate" json:"aggregate,omitempty"`
	Timestamp *google_protobuf.Timestamp `protobuf:"bytes,4,opt,name=timestamp" json:"timestamp,omitempty"`
	Data      *google_protobuf1.Any      `protobuf:"bytes,5,opt,name=data" json:"data,omitempty"`
	Parent    *Aggregate                 `protobuf:"bytes,6,opt,name=parent" json:"parent,omitempty"`
	Hints     *EventHints                `protobuf:"bytes,7,opt,name=hints" json:"hints,omitempty"`
	Metadata  map[string]string          `` /* 136-byte string literal not displayed */
}

func NewEvent

func NewEvent(aggregate Aggregate, payload proto.Message) (*Event, error)

NewEvent returns a new event with the provided payload for the provided aggregate or an error if the input data was invalid.

It returns one of the following errors: - ErrInvalidAggregate: provided aggregate is invalid - ErrCorruptedEventPayload: payload is empty or cannot be marshaled to bytes.

func (*Event) BelongsTo

func (m *Event) BelongsTo(parent Entity) bool

func (*Event) CreatedAt

func (m *Event) CreatedAt() time.Time

func (*Event) Descriptor

func (*Event) Descriptor() ([]byte, []int)

func (*Event) GetAggregate

func (m *Event) GetAggregate() *Aggregate

func (*Event) GetData

func (m *Event) GetData() *google_protobuf1.Any

func (*Event) GetHints

func (m *Event) GetHints() *EventHints

func (*Event) GetId

func (m *Event) GetId() string

func (*Event) GetMetadata

func (m *Event) GetMetadata() map[string]string

func (*Event) GetParent

func (m *Event) GetParent() *Aggregate

func (*Event) GetTimestamp

func (m *Event) GetTimestamp() *google_protobuf.Timestamp

func (*Event) GetType

func (m *Event) GetType() string

func (*Event) Labels

func (m *Event) Labels() labels.Labels

func (*Event) ProtoMessage

func (*Event) ProtoMessage()

func (*Event) Reset

func (m *Event) Reset()

func (*Event) String

func (m *Event) String() string

type EventAppender

type EventAppender interface {
	Append(event *Event) error
}

type EventHints

type EventHints struct {
	Completed bool `protobuf:"varint,1,opt,name=completed" json:"completed,omitempty"`
}

EventHints is a collection of optional metadata that help components in the event store to improve performance.

func (*EventHints) Descriptor

func (*EventHints) Descriptor() ([]byte, []int)

func (*EventHints) GetCompleted

func (m *EventHints) GetCompleted() bool

func (*EventHints) ProtoMessage

func (*EventHints) ProtoMessage()

func (*EventHints) Reset

func (m *EventHints) Reset()

func (*EventHints) String

func (m *EventHints) String() string

type EventStoreErr

type EventStoreErr struct {
	// S is the description of the error. (required)
	S string

	// K is the aggregate related to the error. (optional)
	K *Aggregate

	// E is the event related to the error. (optional)
	E *Event

	// C is the underlying cause of the error. (optional)
	C error
}

EventStoreErr is the base error type returned by functions in the fes package.

Based on the context it provides additional information, such as the aggregate and event related to the error.

func (EventStoreErr) Error

func (err EventStoreErr) Error() string

func (*EventStoreErr) Is

func (err *EventStoreErr) Is(other error) bool

func (EventStoreErr) WithAggregate

func (err EventStoreErr) WithAggregate(aggregate *Aggregate) EventStoreErr

func (EventStoreErr) WithEntity

func (err EventStoreErr) WithEntity(entity Entity) EventStoreErr

func (EventStoreErr) WithError

func (err EventStoreErr) WithError(cause error) EventStoreErr

func (EventStoreErr) WithEvent

func (err EventStoreErr) WithEvent(event *Event) EventStoreErr

type Notification

type Notification struct {
	// Old contains the snapshot of the entity before applying the event.
	//
	// This can be nil, if the event caused the creation of the entity.
	Old Entity

	// Updated is the snapshot of entity after applying the event
	Updated Entity

	// Event is the event that triggered the notification.
	Event *Event

	// Aggregate contains the aggregate of this notification.
	//
	// It is guaranteed that the event, and old and updated snapshots match this aggregate.
	Aggregate Aggregate
}

Notification is the message send to subscribers of the event store.

It is an annotated fes.Event; it includes snapshots of the affected entity before (if applicable) and after the application of the event.

func NewNotification

func NewNotification(old Entity, new Entity, event *Event) *Notification

func (*Notification) CreatedAt

func (n *Notification) CreatedAt() time.Time

CreatedAt returns the timestamp of the event within the notification.

Necessary to conform with the pubsub.Msg interface.

func (*Notification) Labels

func (n *Notification) Labels() labels.Labels

Labels returns the labels of the event part of the notification.

Necessary to conform with the pubsub.Msg interface.

type Projector

type Projector interface {
	Project(entity Entity, events ...*Event) (updated Entity, err error)
	NewProjection(key Aggregate) (Entity, error)
}

Projector projects events onto an entity

Directories

Path Synopsis
mem
package mem contains an implementation of the fes backend using an in-memory cache.
package mem contains an implementation of the fes backend using an in-memory cache.
Package testutil is a generated protocol buffer package.
Package testutil is a generated protocol buffer package.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL