cerk

package module
v0.2.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Apr 12, 2020 License: MIT Imports: 5 Imported by: 0

README

Purpose of the library

This library fill the lack of small part of CQRS pattern in Go using kafka.
What it's means? Which part of the CQRS pattern is supported?

Basic CQRS with Event Sourcing looks like:
Basic CQRS

adding kafka to this diagram then should looks like below:
Basic Kafka CQRS

As you can see, kafka is responsible for either store events and send them via bus.

The purpose of this library is to simplify restore an repository aggregates state from kafka stream on startup of command (writer) service.
New events are put on the kafka stream of course and send to the reader service.

Structure

The library consists of few elements

Repository

type Repository interface {
	InitProvider(provider Provider, child Repository) error
	AddOrModifyEntity(entity Entity)
	GetEntity(id string) (Entity, error)
	Replay(events []Event) error
	AddNewEvent(event Event)
	GetUncommitedChanges() []Event
	Save(events []Event) error
}
  • InitProvider - should be call on startup of the service. Here work the whole magic with restore repository. All events from kafka topic (from all partitions) are restore and aggregate here.

    Important! One argument of function is child Repository. Here should be pass current inherited instance of repository (not base MemoryRepository)

    provider := cerk.NewKafkaProvider("example-topic", "example-app-group", "192.168.1.151:9092")
    repository := new(examplerepository.UsersRepository)
    repository.MemoryRepository = cerk.NewMemoryRepository()
    if err := repository.InitProvider(provider, repository); err != nil {
    	panic(err)
    }
    
  • AddOrModifyEntity - create new entity (aggregator) or update old by id
  • GetEntity - get entity (aggregator) by id
  • Replay - restore/update state of entity/entities
  • AddNewEvent - added new event to uncommitted list of events
  • GetUncommitedChanges - return the new events after update state of entity
  • Save - just send new events to store/bus

The library has prepared memory implementation of this interface - MemoryRepository. This implementation should be inherited by your true Repository Check example

Provider

type Provider interface {
	FetchAllEvents(batch int) (<-chan []Event, error)
	SendEvents(events []Event) error
}

It's a simple interface which provide functions for receive all events (in batches) and send events to bus.

The library give you KafkaProvider implementation with simple constructor NewKafkaProvider(topic string, groupName string, servers string)

Event

type Event interface {
	GetType() string
	GetAggregatorId() string
	GetCreateTime() time.Time
	GetVersion() int32
	GetPayload() string

	LoadPayload() error
	SavePayload() error
	InitBy(event Event)
}

Abstraction for an event which keep of changed entity state.

  • GetType - get name of event type
  • GetAggregatorId - get id of entity
  • GetCreateTime - when was created
  • GetVersion - number of current state of entity
  • GetPayload - return raw payload message
  • LoadPayload - load payload, convert json to fields of event
  • SavePayload - save values of fields of event to json string
  • InitBy - copy data from other event to current

Library provide GenericEvent which is use to restore events from topic. Next these events are mapped to true events by InitBy function:

func (r *UsersRepository) Replay(events []cerk.Event) error {
	for _, e := range events {
		e.LoadPayload() // GenericEvent
		switch e.GetType() {
		case "UserCreatedEvent":
			event := new(exampleevents.UserCreatedEvent)
			event.InitBy(e) // GenericEvent -> UserCreatedEvent
			event.LoadPayload()

Example implementation of event UserCreatedEvent

Entity

type Entity interface {
	GetId() string
}

Represents an aggregator in repository. Example

How to start

Best place to start is an example simple-cqrs-writer.
main.go creates an http server which receive json commands:

POST http://localhost:4000
content-type: application/json

{
    "type": "CreateUserCommand",
    "first_name": "Aaaa",
    "last_name": "Bbbbb"
}
POST http://localhost:4000
content-type: application/json

{
    "type": "UpdateUserCommand",
    "id": "<id of entity>",
    "first_name": "Bbbbb",
    "last_name": "Aaaa"
}

And then processes them - creates entity or update entity, create events and send them to the kafka stream.

Before start you should set proper kafka server to provider in main.go:

    provider := cerk.NewKafkaProvider("example-topic", "example-app-group", "<server>)

Start:
in directory examples/simple-cqrs-writer run:
go run .

Documentation

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

This section is empty.

Types

type Entity

type Entity interface {
	GetId() string
}

Entity is an interface represent one aggregate in repository

type Event

type Event interface {
	GetType() string
	GetAggregatorId() string
	GetCreateTime() time.Time
	GetVersion() int32
	GetPayload() string

	LoadPayload() error
	SavePayload() error
	InitBy(event Event)
}

Event is an basic description for object event keep in event store and transfer between service (usually via some bus)

type GenericEvent

type GenericEvent struct {
	Type         string
	AggregatorId string
	CreateTime   string
	Version      int32

	Payload string
}

func (*GenericEvent) GetAggregatorId

func (e *GenericEvent) GetAggregatorId() string

func (*GenericEvent) GetCreateTime

func (e *GenericEvent) GetCreateTime() time.Time

func (*GenericEvent) GetPayload

func (e *GenericEvent) GetPayload() string

func (*GenericEvent) GetType

func (e *GenericEvent) GetType() string

func (*GenericEvent) GetVersion

func (e *GenericEvent) GetVersion() int32

func (*GenericEvent) InitBy

func (e *GenericEvent) InitBy(event Event)

func (*GenericEvent) LoadPayload

func (e *GenericEvent) LoadPayload() error

func (*GenericEvent) SavePayload

func (e *GenericEvent) SavePayload() error

type KafkaProvider

type KafkaProvider struct {
	// contains filtered or unexported fields
}

KafkaProvider implemented provider for kafka

func (*KafkaProvider) FetchAllEvents

func (p *KafkaProvider) FetchAllEvents(batch int) (<-chan []Event, error)

FetchAllEvents get all events from all partitions from specified topic

func (*KafkaProvider) SendEvents

func (p *KafkaProvider) SendEvents(events []Event) error

SendEvents put messages on kafka topic

type MemoryRepository

type MemoryRepository struct {
	// contains filtered or unexported fields
}

MemoryRepository is an basic implementation of Repository which keep data in the memory. This struct waiting for inheritance by own Repository. Inherited implementation should contains special methods for each needed case of event type

func NewMemoryRepository

func NewMemoryRepository() *MemoryRepository

NewMemoryRepository create empty initialized instance

func (*MemoryRepository) AddNewEvent

func (r *MemoryRepository) AddNewEvent(event Event)

AddNewEvent set newly created event to uncommitted list of events

func (*MemoryRepository) AddOrModifyEntity

func (r *MemoryRepository) AddOrModifyEntity(entity Entity)

AddOrModifyEntity just set new entity to collections of Entities. It will be replace if this id exists

func (*MemoryRepository) GetEntity

func (r *MemoryRepository) GetEntity(id string) (Entity, error)

GetEntity return current entity state provided by id

func (*MemoryRepository) GetUncommitedChanges

func (r *MemoryRepository) GetUncommitedChanges() []Event

GetUncommitedChanges get new events which were created by changes methods

func (*MemoryRepository) InitProvider

func (r *MemoryRepository) InitProvider(provider Provider, child Repository) error

InitProvider set event store provider to repository and start restore entities

func (*MemoryRepository) Replay

func (r *MemoryRepository) Replay(events []Event) error

Replay method update state of entity/ies by provided events. This method should be override by true implementation with update cases for each event type

func (*MemoryRepository) Save

func (r *MemoryRepository) Save(events []Event) error

Save events - so to be honest, just send events to bus provider

type Provider

type Provider interface {
	FetchAllEvents(batch int) (<-chan []Event, error)
	SendEvents(events []Event) error
}

Provider interface

func NewKafkaProvider

func NewKafkaProvider(topic string, groupName string, servers string) Provider

NewKafkaProvider create new instance of provider

type Repository

type Repository interface {
	InitProvider(provider Provider, child Repository) error // ??
	AddOrModifyEntity(entity Entity)
	GetEntity(id string) (Entity, error)
	Replay(events []Event) error
	AddNewEvent(event Event)
	GetUncommitedChanges() []Event
	Save(events []Event) error
}

Repository it's an abstraction for database which keeps all entities (aggregators) in theirs last state

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL