Documentation
¶
Index ¶
- func GetKafkaAggregateTypeTopic(cfg KafkaEventsBusConfig, aggregateType string) kafka.TopicConfig
- func GetTopicName(eventStorePrefix string, aggregateType string) string
- type Aggregate
- type AggregateBase
- func (a *AggregateBase) Apply(event any) error
- func (a *AggregateBase) ClearChanges()
- func (a *AggregateBase) GetChanges() []any
- func (a *AggregateBase) GetID() string
- func (a *AggregateBase) GetType() AggregateType
- func (a *AggregateBase) GetVersion() uint64
- func (a *AggregateBase) Load(events []any) error
- func (a *AggregateBase) RaiseEvent(event any) error
- func (a *AggregateBase) SetID(id string) *AggregateBase
- func (a *AggregateBase) SetType(aggregateType AggregateType)
- func (a *AggregateBase) String() string
- func (a *AggregateBase) ToSnapshot()
- type AggregateRoot
- type AggregateType
- type Apply
- type Event
- func (e *Event) GetAggregateID() string
- func (e *Event) GetAggregateType() AggregateType
- func (e *Event) GetData() []byte
- func (e *Event) GetEventID() string
- func (e *Event) GetEventType() EventType
- func (e *Event) GetJsonData(data interface{}) error
- func (e *Event) GetJsonMetadata(metaData interface{}) error
- func (e *Event) GetMetadata() []byte
- func (e *Event) GetString() string
- func (e *Event) GetTimeStamp() time.Time
- func (e *Event) GetVersion() uint64
- func (e *Event) SetAggregateType(aggregateType AggregateType)
- func (e *Event) SetData(data []byte) *Event
- func (e *Event) SetJsonData(data interface{}) error
- func (e *Event) SetMetadata(metaData interface{}) error
- func (e *Event) SetVersion(aggregateVersion uint64)
- func (e *Event) String() string
- type EventType
- type KafkaEventsBus
- type KafkaEventsBusConfig
- type Load
- type RaiseEvent
- type When
Constants ¶
This section is empty.
Variables ¶
This section is empty.
Functions ¶
func GetKafkaAggregateTypeTopic ¶
func GetKafkaAggregateTypeTopic(cfg KafkaEventsBusConfig, aggregateType string) kafka.TopicConfig
func GetTopicName ¶
Types ¶
type Aggregate ¶
type Aggregate interface {
When
AggregateRoot
RaiseEvent
}
type AggregateBase ¶
type AggregateBase struct {
ID string
Version uint64
Changes []any
Type AggregateType
// contains filtered or unexported fields
}
AggregateBase base aggregate contains all main necessary fields
func NewAggregateBase ¶
func NewAggregateBase(when when) *AggregateBase
NewAggregateBase AggregateBase constructor, contains all main fields and methods, main aggregate must realize When interface and pass as argument to constructor Example of recommended aggregate constructor method:
func NewOrderAggregate() *OrderAggregate {
orderAggregate := &OrderAggregate{
Order: models.NewOrder(),
}
base := es.NewAggregateBase(orderAggregate.When)
base.SetType(OrderAggregateType)
orderAggregate.AggregateBase = base
return orderAggregate
}
func (*AggregateBase) Apply ¶
func (a *AggregateBase) Apply(event any) error
Apply push event to aggregate uncommitted events using When method
func (*AggregateBase) ClearChanges ¶
func (a *AggregateBase) ClearChanges()
ClearChanges clear AggregateBase uncommitted Event's
func (*AggregateBase) GetChanges ¶
func (a *AggregateBase) GetChanges() []any
GetChanges get AggregateBase uncommitted Event's
func (*AggregateBase) GetType ¶
func (a *AggregateBase) GetType() AggregateType
GetType get AggregateBase AggregateType
func (*AggregateBase) GetVersion ¶
func (a *AggregateBase) GetVersion() uint64
GetVersion get AggregateBase version
func (*AggregateBase) Load ¶
func (a *AggregateBase) Load(events []any) error
Load add existing events from event store to aggregate using When interface method
func (*AggregateBase) RaiseEvent ¶
func (a *AggregateBase) RaiseEvent(event any) error
RaiseEvent push event to aggregate applied events using When method, used for load directly from eventstore
func (*AggregateBase) SetID ¶
func (a *AggregateBase) SetID(id string) *AggregateBase
SetID set AggregateBase ID
func (*AggregateBase) SetType ¶
func (a *AggregateBase) SetType(aggregateType AggregateType)
SetType set AggregateBase AggregateType
func (*AggregateBase) String ¶
func (a *AggregateBase) String() string
func (*AggregateBase) ToSnapshot ¶
func (a *AggregateBase) ToSnapshot()
ToSnapshot prepare AggregateBase for saving Snapshot.
type AggregateRoot ¶
type AggregateRoot interface {
GetID() string
SetID(id string) *AggregateBase
GetType() AggregateType
SetType(aggregateType AggregateType)
GetChanges() []any
ClearChanges()
GetVersion() uint64
ToSnapshot()
String() string
Load
Apply
RaiseEvent
}
AggregateRoot contains all methods of AggregateBase
type Event ¶
type Event struct {
EventID string
AggregateID string
EventType EventType
AggregateType AggregateType
Version uint64
Data []byte
Metadata []byte
Timestamp time.Time
}
Event is an internal representation of an event, returned when the Aggregate uses NewEvent to create a new event. The events loaded from the db is represented by each DBs internal event type, implementing Event.
func NewBaseEvent ¶
NewBaseEvent new base Event constructor with configured EventID, Aggregate properties and Timestamp.
func (*Event) GetAggregateID ¶
GetAggregateID is the AggregateID of the Aggregate that the Event belongs to
func (*Event) GetAggregateType ¶
func (e *Event) GetAggregateType() AggregateType
GetAggregateType is the AggregateType that the Event can be applied to.
func (*Event) GetEventType ¶
GetEventType returns the EventType of the event.
func (*Event) GetJsonData ¶
GetJsonData json unmarshal data attached to the Event.
func (*Event) GetJsonMetadata ¶
GetJsonMetadata unmarshal app-specific metadata serialized as json for the Event.
func (*Event) GetMetadata ¶
GetMetadata is app-specific metadata such as request AggregateID, originating user etc.
func (*Event) GetTimeStamp ¶
GetTimeStamp get timestamp of the Event.
func (*Event) GetVersion ¶
GetVersion is the version of the Aggregate after the Event has been applied.
func (*Event) SetAggregateType ¶
func (e *Event) SetAggregateType(aggregateType AggregateType)
SetAggregateType set the AggregateType that the Event can be applied to.
func (*Event) SetJsonData ¶
SetJsonData serialize to json and set data attached to the Event.
func (*Event) SetMetadata ¶
SetMetadata add app-specific metadata serialized as json for the Event.
func (*Event) SetVersion ¶
SetVersion set the version of the Aggregate.
type EventType ¶
type EventType string
EventType is the type of any event, used as its unique identifier.
type KafkaEventsBus ¶
type KafkaEventsBus struct {
// contains filtered or unexported fields
}
func NewKafkaEventsBus ¶
func NewKafkaEventsBus(producer kafkaClient.Producer, cfg KafkaEventsBusConfig) *KafkaEventsBus
NewKafkaEventsBus kafkaEventsBus constructor.
func (*KafkaEventsBus) ProcessEvents ¶
func (e *KafkaEventsBus) ProcessEvents(ctx context.Context, events []Event) error
ProcessEvents serialize to json and publish es.Event's to the kafka topic.
type KafkaEventsBusConfig ¶
type KafkaEventsBusConfig struct {
Topic string `mapstructure:"topic" validate:"required"`
TopicPrefix string `mapstructure:"topicPrefix" validate:"required"`
Partitions int `mapstructure:"partitions" validate:"required,gte=0"`
ReplicationFactor int `mapstructure:"replicationFactor" validate:"required,gte=0"`
Headers []kafka.Header
}
KafkaEventsBusConfig kafka eventbus config.
type RaiseEvent ¶
RaiseEvent process applied Aggregate Event from event store