goengine

package module
v0.0.0-...-fc38f95 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jun 26, 2017 License: MIT Imports: 9 Imported by: 0

README

hellofresh/engine

Build Status

Welcome to HelloFresh GoEngine!!

GoEngine provides you all the capabilities to build an Event sourced application in go. This was based on the initial project Engine for PHP

Components

Engine is divided in a few small independent components.

Install

go get -u github.com/hellofresh/goengine

Usage

Here you can check a small tutorial of how to use this component in an orders scenario.

Tutorial

Logging

GoEngine uses default log package for debug logging. If you want to use your own logger - goengine.SetLogHandler() is available. Here is how you can use, e.g. github.com/sirupsen/logrus for logging:

package main

import (
    "github.com/hellofresh/goengine"
    log "github.com/sirupsen/logrus"
)

func main() {
    goengine.SetLogHandler(func(msg string, fields map[string]interface{}, err error) {
        if nil == fields && nil == err {
            log.Debug(msg)
        } else {
            var entry *log.Entry
            if fields != nil {
                entry = log.WithFields(log.Fields(fields))
                if nil != err {
                    entry = entry.WithError(err)
                }
            } else {
                entry = log.WithError(err)
            }

            entry.Debug(msg)
        }
    })

    // do your application stuff
}

Contributing

Please see CONTRIBUTING for details.

License

The MIT License (MIT). Please see License File for more information.

Documentation

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

func Log

func Log(msg string, fields map[string]interface{}, err error)

func SetLogHandler

func SetLogHandler(handler LogHandler)

Types

type AggregateRepository

type AggregateRepository interface {
	Load(string, StreamName) (*EventStream, error)
	Save(AggregateRoot, StreamName) error
	Reconstitute(string, AggregateRoot, StreamName) error
}

type AggregateRoot

type AggregateRoot interface {
	GetID() string
	GetVersion() int
	SetVersion(int)
	Apply(DomainEvent)
	GetUncommittedEvents() []*DomainMessage
}

type AggregateRootBased

type AggregateRootBased struct {
	ID string
	// contains filtered or unexported fields
}

func NewAggregateRootBased

func NewAggregateRootBased(source interface{}) *AggregateRootBased

NewAggregateRootBased constructor

func NewEventSourceBasedWithID

func NewEventSourceBasedWithID(source interface{}, id string) *AggregateRootBased

NewEventSourceBasedWithID constructor

func (*AggregateRootBased) Apply

func (r *AggregateRootBased) Apply(event DomainEvent)

func (*AggregateRootBased) GetID

func (r *AggregateRootBased) GetID() string

func (*AggregateRootBased) GetUncommittedEvents

func (r *AggregateRootBased) GetUncommittedEvents() []*DomainMessage

func (*AggregateRootBased) GetVersion

func (r *AggregateRootBased) GetVersion() int

func (*AggregateRootBased) Record

func (r *AggregateRootBased) Record(event DomainEvent)

func (*AggregateRootBased) RecordThat

func (r *AggregateRootBased) RecordThat(event DomainEvent)

func (*AggregateRootBased) SetVersion

func (r *AggregateRootBased) SetVersion(version int)

type DomainEvent

type DomainEvent interface {
	OccurredOn() time.Time
}

type DomainMessage

type DomainMessage struct {
	ID         string      `json:"aggregate_id,omitempty"`
	Version    int         `json:"version"`
	Payload    DomainEvent `json:"payload"`
	RecordedOn time.Time   `json:"recorded_on"`
}

func NewDomainMessage

func NewDomainMessage(id string, version int, payload DomainEvent, recordedOn time.Time) *DomainMessage

func RecordNow

func RecordNow(id string, version int, payload DomainEvent) *DomainMessage

func (*DomainMessage) String

func (dm *DomainMessage) String() string

type EventStore

type EventStore interface {
	Append(events *EventStream) error
	GetEventsFor(streamName StreamName, id string) (*EventStream, error)
	FromVersion(streamName StreamName, id string, version int) (*EventStream, error)
	CountEventsFor(streamName StreamName, id string) (int, error)
}

type EventStream

type EventStream struct {
	Name   StreamName
	Events []*DomainMessage
}

func NewEventStream

func NewEventStream(name StreamName, events []*DomainMessage) *EventStream

type InMemoryTypeRegistry

type InMemoryTypeRegistry struct {
	// contains filtered or unexported fields
}

InMemoryTypeRegistry implements the in memory strategy for the registry

func NewInMemoryTypeRegistry

func NewInMemoryTypeRegistry() *InMemoryTypeRegistry

NewInMemoryTypeRegistry creates a new in memory registry

func (*InMemoryTypeRegistry) Get

func (r *InMemoryTypeRegistry) Get(name string) (interface{}, error)

Get retrieves a reflect.Type based on a name

func (*InMemoryTypeRegistry) GetTypeByName

func (r *InMemoryTypeRegistry) GetTypeByName(typeName string) (reflect.Type, bool)

func (*InMemoryTypeRegistry) RegisterAggregate

func (r *InMemoryTypeRegistry) RegisterAggregate(aggregate AggregateRoot, events ...interface{})

func (*InMemoryTypeRegistry) RegisterEvents

func (r *InMemoryTypeRegistry) RegisterEvents(events ...interface{})

func (*InMemoryTypeRegistry) RegisterType

func (r *InMemoryTypeRegistry) RegisterType(i interface{})

RegisterType adds a type in the registry

type LogHandler

type LogHandler func(msg string, fields map[string]interface{}, err error)

type MapBasedVersionedEventDispatcher

type MapBasedVersionedEventDispatcher struct {
	// contains filtered or unexported fields
}

MapBasedVersionedEventDispatcher is a simple implementation of the versioned event dispatcher. Using a map it registered event handlers to event types

func NewVersionedEventDispatcher

func NewVersionedEventDispatcher() *MapBasedVersionedEventDispatcher

NewVersionedEventDispatcher is a constructor for the MapBasedVersionedEventDispatcher

func (*MapBasedVersionedEventDispatcher) DispatchEvent

func (m *MapBasedVersionedEventDispatcher) DispatchEvent(event *DomainMessage) error

DispatchEvent executes all event handlers registered for the given event type

func (*MapBasedVersionedEventDispatcher) RegisterEventHandler

func (m *MapBasedVersionedEventDispatcher) RegisterEventHandler(event interface{}, handler VersionedEventHandler)

RegisterEventHandler allows a caller to register an event handler given an event of the specified type being received

func (*MapBasedVersionedEventDispatcher) RegisterGlobalHandler

func (m *MapBasedVersionedEventDispatcher) RegisterGlobalHandler(handler VersionedEventHandler)

RegisterGlobalHandler allows a caller to register a wildcard event handler call on any event received

type PublisherRepository

type PublisherRepository struct {
	EventStore EventStore
	EventBus   VersionedEventPublisher
}

func NewPublisherRepository

func NewPublisherRepository(eventStore EventStore, eventBus VersionedEventPublisher) *PublisherRepository

func (*PublisherRepository) Load

func (r *PublisherRepository) Load(id string, streamName StreamName) (*EventStream, error)

func (*PublisherRepository) Reconstitute

func (r *PublisherRepository) Reconstitute(id string, source AggregateRoot, streamName StreamName) error

func (*PublisherRepository) Save

func (r *PublisherRepository) Save(aggregateRoot AggregateRoot, streamName StreamName) error

type StreamName

type StreamName string

type TypeRegistry

type TypeRegistry interface {
	GetTypeByName(string) (reflect.Type, bool)
	RegisterAggregate(AggregateRoot, ...interface{})
	RegisterEvents(...interface{})
	RegisterType(interface{})
	Get(string) (interface{}, error)
}

TypeRegistry is a registry for go types this is necessary since we can't create a type from a string and it's json. With this registry we can know how to create a type for that string

type VersionedEventDispatchManager

type VersionedEventDispatchManager struct {
	// contains filtered or unexported fields
}

VersionedEventDispatchManager is responsible for coordinating receiving messages from event receivers and dispatching them to the event dispatcher.

func NewVersionedEventDispatchManager

func NewVersionedEventDispatchManager(receiver VersionedEventReceiver, registry TypeRegistry) *VersionedEventDispatchManager

NewVersionedEventDispatchManager is a constructor for the VersionedEventDispatchManager

func (*VersionedEventDispatchManager) Listen

func (m *VersionedEventDispatchManager) Listen(stop <-chan bool, exclusive bool) error

Listen starts a listen loop processing channels related to new incoming events, errors and stop listening requests

func (*VersionedEventDispatchManager) RegisterEventHandler

func (m *VersionedEventDispatchManager) RegisterEventHandler(event interface{}, handler VersionedEventHandler)

RegisterEventHandler allows a caller to register an event handler given an event of the specified type being received

func (*VersionedEventDispatchManager) RegisterGlobalHandler

func (m *VersionedEventDispatchManager) RegisterGlobalHandler(handler VersionedEventHandler)

RegisterGlobalHandler allows a caller to register a wildcard event handler call on any event received

type VersionedEventDispatcher

type VersionedEventDispatcher interface {
	DispatchEvent(*DomainMessage) error
	RegisterEventHandler(event interface{}, handler VersionedEventHandler)
	RegisterGlobalHandler(handler VersionedEventHandler)
}

VersionedEventDispatcher is responsible for routing events from the event manager to call handlers responsible for processing received events

type VersionedEventHandler

type VersionedEventHandler func(*DomainMessage) error

VersionedEventHandler is a function that takes a versioned event

type VersionedEventPublisher

type VersionedEventPublisher interface {
	PublishEvents([]*DomainMessage) error
}

VersionedEventPublisher is responsible for publishing events that have been saved to the event store\repository

type VersionedEventReceiver

type VersionedEventReceiver interface {
	ReceiveEvents(VersionedEventReceiverOptions) error
}

VersionedEventReceiver is responsible for receiving globally published events

type VersionedEventReceiverOptions

type VersionedEventReceiverOptions struct {
	TypeRegistry TypeRegistry
	Close        chan chan error
	Error        chan error
	ReceiveEvent chan VersionedEventTransactedAccept
	Exclusive    bool
}

VersionedEventReceiverOptions is an initalization structure to communicate to and from an event receiver go routine

type VersionedEventTransactedAccept

type VersionedEventTransactedAccept struct {
	Event                 *DomainMessage
	ProcessedSuccessfully chan bool
}

VersionedEventTransactedAccept is the message routed from an event receiver to the event manager. Sometimes event receivers designed with reliable delivery require acknowledgements after a message has been received. The success channel here allows for such acknowledgements

Directories

Path Synopsis
cmd

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL