dogma

package module
v0.13.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Mar 25, 2024 License: MIT Imports: 3 Imported by: 32

README

Dogma

Build message-based applications in Go.

Documentation Latest Version Build Status Code Coverage

Overview

Dogma is a toolkit for building message-based applications in Go.

In Dogma, an application implements business logic by consuming and producing messages . The application is strictly separated from the engine, which handles message delivery and data persistence.

Features

  • Built for Domain Driven Design: The API uses DDD terminology to help developers align their understanding of the application's business logic with its implementation.

  • Flexible message format: Supports any Go type that can be serialized as a byte slice, with built-in support for JSON and Protocol Buffers.

  • First-class testing: Dogma's testkit module runs isolated behavioral tests of your application.

  • Engine-agnostic applications: Choose the engine with the best messaging and persistence semantics for your application.

  • Built-in introspection: Analyze application code to visualize how messages traverse your applications.

  • testkit: Utilities for black-box testing of Dogma applications.
  • projectionkit: Utilities for building projections in various popular database systems.
  • example: An example Dogma application that implements basic banking features.

Concepts

Dogma leans heavily on the concepts of Domain Driven Design. It's designed to provide a suitable platform for applications that make use of design patterns such as Command/Query Responsibility Segregation (CQRS), Event Sourcing and Eventual Consistency.

The following concepts are core to Dogma's design, and should be well understood by any developer wishing to build an application:

Message

A message is a data structure that represents a command, event or timeout within an application.

A command is a request to make a single atomic change to the application's state. An event indicates that the state has changed in some way. A single command can produce any number of events, including zero.

A timeout helps model business logic that depends on the passage of time.

Messages must implement the appropriate interface: Command, Event or Timeout. These interfaces serve as aliases for dogma.Message, but may diverge in the future.

Message handler

A message handler is part of an application that acts upon messages it receives.

Each handler specifies the message types it expects to receive. These message are routed to the handler by the engine.

Command messages are always routed to a single handler. Event messages may be routed to any number of handlers, including zero. Timeout messages are always routed back to the handler that produced them.

Dogma defines four handler types, one each for aggregates, processes, integrations and projections. These concepts are described in more detail below.

Application

An application is a collection of message handlers that work together as a unit. Typically, each application encapsulates a specific business (sub-)domain or "bounded-context".

Engine

An engine is a Go module that delivers messages to an application and persists the application's state.

A Dogma application can run on any Dogma engine. The choice of engine brings with it a set of guarantees about how the application behaves, for example:

  • Consistency: Different engines may provide different levels of consistency guarantees, such as immediate consistency or eventual consistency.

  • Message delivery: One engine may deliver messages in the same order that they were produced, while another may process messages out of order or in batches.

  • Persistence: The engine may offer a choice of persistence mechanisms for application state, such as in-memory, on-disk, or in a remote database.

  • Data model: The engine may provide a choice of data models for application state, such as relational or document-oriented.

  • Scalability: The engine may provide a choice of scalability models, such as single-node or multi-node.

This repository is not itself an engine implementation. It defines the API that engines and applications use to interact.

One example of a Dogma engine is Veracity.

Aggregate

An aggregate is an entity that encapsulates a specific part of an application's business logic and its associated state. Each instance of an aggregate represents a unique occurrence of that entity within the application.

Each aggregate has an associated implementation of the dogma.AggregateMessageHandler interface. The engine routes command messages to the handler to change the state of specific instances. Such changes are represented by event messages.

An important responsibility of an aggregate is to enforce the invariants of the business domain. These are the rules that must hold true at all times. For example, in a hypothetical banking system, an aggregate representing a customer's account balance must ensure that the balance never goes below zero.

The engine manages each aggregate instance's state. State changes are "immediately consistent" meaning that the changes made by one command are always visible to future commands routed to the same instance.

Aggregates can be a difficult concept to grasp. The book Domain Driven Design Distilled, by Vaughn Vernon offers a suitable introduction to aggregates and the other elements of domain driven design.

Process

A process automates a long running business process. In particular, they can coordinate changes across multiple aggregate instances, or between aggregates and integrations.

Like aggregates, processes encapsulate related logic and state. Each instance of a process represents a unique occurrence of that process within the application.

Each process has an associated implementation of the dogma.ProcessMessageHandler interface. The engine routes event messages, which produces commands to execute.

A process may use timeout messages to model business processes with time-based logic. The engine always routes timeout messages back to the process instance that produced them.

Processes use command messages to make changes to the application's state. Because each command represents a separate atomic change, the results of a process are "eventually consistent".

Integration

An integration is a message handler that interacts with some external non-message-based system.

Each integration is an implementation of the dogma.IntegrationMessageHandler interface. The engine routes command messages to the handler which interacts with some external system(s). Integrations may optionally produce event messages that represent the results of their interactions.

Integrations are stateless from the perspective of the engine.

Projection

A projection builds a partial view of the application's state from the events that occur.

Each projection is an implementation of the dogma.ProjectionMessageHandler interface. The engine routes event messages to the handler which typically updates a read-optimized database of some kind. This view is often referred to as a "read model" or "query model".

The projectionkit module provides engine-agnostic tools for building projections in various popular database systems, such as PostgreSQL, MySQL, DynamoDB and others

Documentation

Overview

Package dogma contains the API used by a Dogma application to interact with a Dogma engine.

The key words "MUST", "MUST NOT", "REQUIRED", "SHALL", "SHALL NOT", "SHOULD", "SHOULD NOT", "RECOMMENDED", "MAY", and "OPTIONAL" in this document are to be interpreted as described in RFC 2119.

Index

Constants

This section is empty.

Variables

View Source
var UnexpectedMessage unexpectedMessage

UnexpectedMessage is a panic value used by a message handler when it receives a message of a type that it did not expect.

Functions

This section is empty.

Types

type AggregateCommandScope

type AggregateCommandScope interface {
	// InstanceID returns the ID of the aggregate instance.
	InstanceID() string

	// RecordEvent records the occurrence of an event.
	//
	// It applies the event to the root such that the applied changes are
	// visible to the handler after this method returns.
	//
	// Recording an event cancels any prior call to Destroy() on this scope.
	RecordEvent(Event)

	// Destroy signals destruction of the aggregate instance.
	//
	// Destroying a process discards its state. The first command to target a
	// destroyed instance operates on a new root.
	//
	// Destruction occurs once the HandleCommand() method returns. Any future
	// call to RecordEvent() on this scope prevents destruction.
	//
	// The precise destruction semantics are engine defined. For example,
	// event-sourcing engines typically do not destroy the record of the
	// aggregate's historical events.
	Destroy()

	// Log records an informational message.
	Log(format string, args ...any)
}

AggregateCommandScope performs engine operations within the context of a call to the HandleCommand() method of an AggregateMessageHandler.

type AggregateConfigurer

type AggregateConfigurer interface {
	// Identity configures the handler's identity.
	//
	// n is a short human-readable name. It MUST be unique within the
	// application at any given time, but MAY change over the handler's
	// lifetime. It MUST contain solely printable, non-space UTF-8 characters.
	// It must be between 1 and 255 bytes (not characters) in length.
	//
	// k is a unique key used to associate engine state with the handler. The
	// key SHOULD NOT change over the handler's lifetime. k MUST be an RFC 4122
	// UUID, such as "5195fe85-eb3f-4121-84b0-be72cbc5722f".
	//
	// Use of hard-coded literals for both values is RECOMMENDED.
	Identity(n string, k string)

	// Routes configures the engine to route certain message types to and from
	// the handler.
	//
	// Aggregate handlers support the HandlesCommand() and RecordsEvent() route
	// types.
	Routes(...AggregateRoute)
}

An AggregateConfigurer configures the engine for use with a specific aggregate message handler.

type AggregateMessageHandler

type AggregateMessageHandler interface {
	// Configure describes the handler's configuration to the engine.
	Configure(AggregateConfigurer)

	// New returns an aggregate root instance in its initial state.
	//
	// The return value MUST NOT be nil. It MAY be the zero-value of the root's
	// underlying type.
	//
	// Each call SHOULD return the same type and initial state.
	New() AggregateRoot

	// RouteCommandToInstance returns the ID of the instance that handles a
	// specific command.
	//
	// The return value MUST not be empty. RFC 4122 UUIDs are the RECOMMENDED
	// format for instance IDs.
	RouteCommandToInstance(Command) string

	// HandleCommand executes business logic in response to a command.
	//
	// The handler inspects the root to determine which events to record, if
	// any.
	//
	// The handler SHOULD NOT have any side-effects beyond recording events.
	// Specifically, the implementation MUST NOT modify the root directly. Use
	// AggregateCommandScope.RecordEvent() to record an event that represents
	// the state change. See also AggregateRoot.ApplyEvent().
	//
	// If this is the first command routed to this instance, the root is the
	// return value of New(). Otherwise, it's the value of the root as it
	// existed after handling the command.
	//
	// While the engine MAY call this method concurrently from separate
	// goroutines or operating system processes, the state changes and events
	// that represent them always appear to have occurred sequentially.
	HandleCommand(AggregateRoot, AggregateCommandScope, Command)
}

A AggregateMessageHandler models business logic and state.

Aggregates are the primary building blocks of an application's domain logic. They enforce the domain's strict invariants.

Aggregates use Command messages to represent requests to perform some specific business logic and change the state of the application accordingly. Event messages represent those changes.

Aggregates are stateful. An application typically uses multiple instances of an aggregate, each with its own state. For example, a banking application may use one instance of the "account" aggregate for each bank account.

The state of each instance is application-defined. Often it's a tree of related entities and values. The AggregateRoot interface represents the "root" entity through which the handler accesses the instance's state.

type AggregateRoot

type AggregateRoot interface {
	// ApplyEvent updates aggregate instance to reflect the occurrence of an
	// event.
	//
	// This implementation of this method is the only code permitted to
	// modify the instance's state.
	//
	// The method SHOULD accept historical events that are no longer routed to
	// this aggregate type. This is typically required by event-sourcing engines
	// that sometimes load aggregates into memory by applying their entire
	// history.
	ApplyEvent(Event)
}

AggregateRoot is an interface for the domain-specific state of a specific aggregate instance.

type AggregateRoute added in v0.12.0

type AggregateRoute interface {
	Route
	// contains filtered or unexported methods
}

AggregateRoute describes a message type that's routed to or from a AggregateMessageHandler.

type Application

type Application interface {
	// Configure describes the application's configuration to the engine.
	Configure(ApplicationConfigurer)
}

An Application is a collection of message handlers that model a single logical business domain.

type ApplicationConfigurer

type ApplicationConfigurer interface {
	// Identity configures the application's identity.
	//
	// n is a short human-readable name. It MAY change over the application's
	// lifetime. It MUST contain solely printable, non-space UTF-8 characters.
	// It must be between 1 and 255 bytes (not characters) in length.
	//
	// k is a unique key used to associate engine state with the application.
	// The key SHOULD NOT change over the application's lifetime. k MUST be an
	// RFC 4122 UUID, such as "5195fe85-eb3f-4121-84b0-be72cbc5722f".
	//
	// Use of hard-coded literals for both values is RECOMMENDED.
	Identity(n string, k string)

	// RegisterAggregate configures the engine to route messages for an
	// aggregate.
	RegisterAggregate(AggregateMessageHandler, ...RegisterAggregateOption)

	// RegisterProcess configures the engine to route messages for a process.
	RegisterProcess(ProcessMessageHandler, ...RegisterProcessOption)

	// RegisterIntegration configures the engine to route messages for an
	// integration.
	RegisterIntegration(IntegrationMessageHandler, ...RegisterIntegrationOption)

	// RegisterProjection configures the engine to route messages for a
	// projection.
	RegisterProjection(ProjectionMessageHandler, ...RegisterProjectionOption)
}

An ApplicationConfigurer configures the engine for use with a specific application.

type BroadcastProjectionDeliveryPolicy added in v0.12.0

type BroadcastProjectionDeliveryPolicy struct {
	// PrimaryFirst defers "secondary delivery" of events until after the
	// "primary delivery" has completed.
	PrimaryFirst bool
}

BroadcastProjectionDeliveryPolicy is a ProjectionDeliveryPolicy that delivers each event to a all instance of the application.

type Command added in v0.12.0

type Command = Message

A Command is a message that represents a request for a Dogma application to perform some action.

type CommandExecutor

type CommandExecutor interface {
	// ExecuteCommand executes or enqueues a command.
	//
	// If it returns nil, the engine has guaranteed execution of the command.
	// Otherwise, the it's the caller's responsibility to retry.
	//
	// The application SHOULD assume that the command is executed
	// asynchronously; it has not necessarily executed by the time the method
	// returns.
	ExecuteCommand(context.Context, Command, ...ExecuteCommandOption) error
}

A CommandExecutor executes a command from outside the context of any message handler.

The CommandExecutor is the primary way that code outside of the Dogma application interacts with the Dogma engine.

type Event added in v0.12.0

type Event = Message

An Event is a message that indicates that some action has occurred within a Dogma application.

type ExecuteCommandOption added in v0.13.0

type ExecuteCommandOption struct{}

ExecuteCommandOption is an option that affects the behavior of a call to the ExecuteCommand() method of the CommandExecutor interface.

type ExecutesCommandOption added in v0.13.0

type ExecutesCommandOption struct{}

ExecutesCommandOption is an option that affects the behavior of the route returned by ExecutesCommand.

type ExecutesCommandRoute added in v0.12.0

type ExecutesCommandRoute struct{ Type reflect.Type }

ExecutesCommandRoute describes a route for a handler that executes a Command of a specific type.

func ExecutesCommand added in v0.12.0

func ExecutesCommand[T Command](...ExecutesCommandOption) ExecutesCommandRoute

ExecutesCommand routes command messages produced by a ProcessMessageHandler.

It's used as an argument to the Routes() method of ProcessConfigurer.

type HandlesCommandOption added in v0.13.0

type HandlesCommandOption struct{}

HandlesCommandOption is an option that affects the behavior of the route returned by HandlesCommand.

type HandlesCommandRoute added in v0.12.0

type HandlesCommandRoute struct{ Type reflect.Type }

HandlesCommandRoute describes a route for a handler that handles a Command of a specific type.

func HandlesCommand added in v0.12.0

func HandlesCommand[T Command](...HandlesCommandOption) HandlesCommandRoute

HandlesCommand routes command messages to an AggregateMessageHandler or IntegrationMessageHandler.

It's used as an argument to the Routes() method of AggregateConfigurer or IntegrationConfigurer.

An application MUST NOT route a single command type to more than one handler.

type HandlesEventOption added in v0.13.0

type HandlesEventOption struct{}

HandlesEventOption is an option that affects the behavior of the route returned by HandlesEvent.

type HandlesEventRoute added in v0.12.0

type HandlesEventRoute struct{ Type reflect.Type }

HandlesEventRoute describes a route for a handler that handles an Event of a specific type.

func HandlesEvent added in v0.12.0

func HandlesEvent[T Event](...HandlesEventOption) HandlesEventRoute

HandlesEvent routes event messages to a ProcessMessageHandler or ProjectionMessageHandler.

It's used as an argument to the Routes() method of ProcessConfigurer or ProjectionConfigurer.

type IntegrationCommandScope

type IntegrationCommandScope interface {
	// RecordEvent records the occurrence of an event.
	RecordEvent(Event)

	// Log records an informational message.
	Log(format string, args ...any)
}

IntegrationCommandScope performs engine operations within the context of a call to the HandleCommand() method of an IntegrationMessageHandler.

type IntegrationConfigurer

type IntegrationConfigurer interface {
	// Identity configures the handler's identity.
	//
	// n is a short human-readable name. It MUST be unique within the
	// application at any given time, but MAY change over the handler's
	// lifetime. It MUST contain solely printable, non-space UTF-8 characters.
	// It must be between 1 and 255 bytes (not characters) in length.
	//
	// k is a unique key used to associate engine state with the handler. The
	// key SHOULD NOT change over the handler's lifetime. k MUST be an RFC 4122
	// UUID, such as "5195fe85-eb3f-4121-84b0-be72cbc5722f".
	//
	// Use of hard-coded literals for both values is RECOMMENDED.
	Identity(n string, k string)

	// Routes configures the engine to route certain message types to and from
	// the handler.
	//
	// Integration handlers support the HandlesCommand() and RecordsEvent()
	// route types.
	Routes(...IntegrationRoute)
}

A IntegrationConfigurer configures the engine for use with a specific integration message handler.

type IntegrationMessageHandler

type IntegrationMessageHandler interface {
	// Configure describes the handler's configuration to the engine.
	Configure(IntegrationConfigurer)

	// HandleCommand handles a command, typically by invoking some external API.
	//
	// It MAY optionally record events that describe the outcome of the command.
	//
	// The engine MAY call this method concurrently from separate goroutines or
	// operating system processes.
	//
	// The implementation SHOULD NOT impose a context deadline; implement the
	// TimeoutHint() method instead.
	HandleCommand(context.Context, IntegrationCommandScope, Command) error

	// TimeoutHint returns a suitable duration for handling the given message.
	//
	// The duration SHOULD be as short as possible. If no hint is available it
	// MUST be zero.
	//
	// In this context, "timeout" refers to a deadline, not a timeout message.
	TimeoutHint(Message) time.Duration
}

An IntegrationMessageHandler integrates a Dogma application with external and non-message-based systems.

The engine does not keep any state for integration handlers.

type IntegrationRoute added in v0.12.0

type IntegrationRoute interface {
	Route
	// contains filtered or unexported methods
}

IntegrationRoute describes a message type that's routed to or from a IntegrationMessageHandler.

type Message

type Message interface {
	// MessageDescription returns a human-readable description of the message.
	MessageDescription() string

	// Validate returns a non-nil error if the message is invalid.
	Validate() error
}

A Message is an application-defined unit of data that describes a Command, Event, or Timeout within a message-based application.

type NoCompactBehavior added in v0.10.0

type NoCompactBehavior struct{}

NoCompactBehavior is an embeddable type for ProjectionMessageHandler implementations that do not require compaction.

func (NoCompactBehavior) Compact added in v0.10.0

Compact does nothing.

type NoTimeoutHintBehavior added in v0.5.0

type NoTimeoutHintBehavior struct{}

NoTimeoutHintBehavior is an embeddable type for ProcessMessageHandler, IntegrationMessageHandler and ProjectionMessageHandler implementations that do not provide a message handling timeout hint.

func (NoTimeoutHintBehavior) TimeoutHint added in v0.5.0

TimeoutHint always returns zero.

type NoTimeoutMessagesBehavior added in v0.5.0

type NoTimeoutMessagesBehavior struct{}

NoTimeoutMessagesBehavior is an embeddable type for ProcessMessageHandler implementations that do not use Timeout messages.

func (NoTimeoutMessagesBehavior) HandleTimeout added in v0.5.0

HandleTimeout panics with the UnexpectedMessage value.

type ProcessConfigurer

type ProcessConfigurer interface {
	// Identity configures the handler's identity.
	//
	// n is a short human-readable name. It MUST be unique within the
	// application at any given time, but MAY change over the handler's
	// lifetime. It MUST contain solely printable, non-space UTF-8 characters.
	// It must be between 1 and 255 bytes (not characters) in length.
	//
	// k is a unique key used to associate engine state with the handler. The
	// key SHOULD NOT change over the handler's lifetime. k MUST be an RFC 4122
	// UUID, such as "5195fe85-eb3f-4121-84b0-be72cbc5722f".
	//
	// Use of hard-coded literals for both values is RECOMMENDED.
	Identity(n string, k string)

	// Routes configures the engine to route certain message types to and from
	// the handler.
	//
	// Process handlers support the HandlesEvent(), ExecutesCommand() and
	// SchedulesTimeout() route types.
	Routes(...ProcessRoute)
}

A ProcessConfigurer configures the engine for use with a specific process message handler.

type ProcessEventScope

type ProcessEventScope interface {
	// InstanceID returns the ID of the process instance.
	InstanceID() string

	// End signals the end of the process.
	//
	// Ending a process instance destroys its state and cancels any pending
	// timeouts.
	//
	// The process instance ends once HandleEvent() returns. Any future call to
	// ExecuteCommand() or ScheduleTimeout() on this scope prevents the process
	// from ending.
	//
	// "Re-beginning" a process instance that has ended has undefined behavior
	// and is NOT RECOMMENDED.
	End()

	// ExecuteCommand executes a command as a result of the event.
	//
	// Executing a command cancels any prior call to End() on this scope.
	ExecuteCommand(Command)

	// ScheduleTimeout schedules a timeout to occur at a specific time.
	//
	// Ending the process cancels any pending timeouts. Scheduling a timeout
	// cancels any prior call to End() on this scope.
	ScheduleTimeout(Timeout, time.Time)

	// RecordedAt returns the time at which the event occurred.
	RecordedAt() time.Time

	// Log records an informational message.
	Log(format string, args ...any)
}

ProcessEventScope performs engine operations within the context of a call to the HandleEvent() method of a ProcessMessageHandler.

type ProcessMessageHandler

type ProcessMessageHandler interface {
	// Configure describes the handler's configuration to the engine.
	Configure(ProcessConfigurer)

	// New returns a process root instance in its initial state.
	//
	// The return value MUST NOT be nil. It MAY be the zero-value of the root's
	// underlying type.
	//
	// Each call SHOULD return the same type and initial state.
	New() ProcessRoot

	// RouteEventToInstance returns the ID of the instance that handles a
	// specific event.
	//
	// If ok is false, the process ignores this event. Otherwise, id MUST not be
	// empty. RFC 4122 UUIDs are the RECOMMENDED format for instance IDs.
	//
	// A process instance begins the first time it receives an event.
	RouteEventToInstance(context.Context, Event) (id string, ok bool, err error)

	// HandleEvent begins or continues the process in response to an event.
	//
	// The handler inspects the root to determine which commands to execute, if
	// any. It may also schedule timeouts to "wake" the process at a later time.
	//
	// If this is the first event routed to this instance, the root is the
	// return value of New(). Otherwise, it's the value of the root as it
	// existed after handling the last event or timeout.
	//
	// The engine MAY provide specific guarantees about the order in which it
	// supplies events to the handler. To maximize portability across engines,
	// the handler SHOULD NOT assume any specific ordering. The engine MAY call
	// this method concurrently from separate goroutines or operating system
	// processes.
	//
	// The implementation SHOULD NOT impose a context deadline. Implement the
	// TimeoutHint() method instead.
	HandleEvent(context.Context, ProcessRoot, ProcessEventScope, Event) error

	// HandleTimeout continues the process in response to a timeout.
	//
	// The handler inspects the root to determine which commands to execute, if
	// any. It may also schedule timeout messages to "wake" the process at a
	// later time.
	//
	// The engine MUST NOT call this method before the timeout's scheduled time.
	// The engine MAY call this method concurrently from separate goroutines or
	// operating system processes.
	//
	// The implementation SHOULD NOT impose a context deadline. Implement the
	// TimeoutHint() method instead.
	HandleTimeout(context.Context, ProcessRoot, ProcessTimeoutScope, Timeout) error

	// TimeoutHint returns a suitable duration for handling the given message.
	//
	// The duration SHOULD be as short as possible. If no hint is available it
	// MUST be zero.
	//
	// In this context, "timeout" refers to a deadline, not a timeout message.
	TimeoutHint(Message) time.Duration
}

A ProcessMessageHandler models a business process.

Processes are useful for coordinating changes across aggregate instances and for modeling processes that include time-based logic.

Event messages can begin, advance or end a process. The process causes changes within the application by executing Command messages.

Processes are stateful. Am application typically uses multiple instances of a process, each with its own state. For example, an e-commerce application may use one instance of the "checkout" process for each customer's shopping cart.

The state of each instance is application-defined. Often it's a tree of related entities and values. The ProcessRoot interface represents the "root" entity through which the handler accesses the instance's state.

Processes can also schedule Timeout messages. Timeouts model time in the business domain. For example, a timeout could trigger an email to a customer who added a product to their shopping cart but did not pay within one hour.

Process handlers SHOULD NOT directly perform write operations such as updating a database or invoking any API that does so.

type ProcessRoot

type ProcessRoot interface{}

ProcessRoot is a "marker" interface for the domain-specific state of a specific process instance.

The interface is empty to allow use of any types supported by the engine.

var StatelessProcessRoot ProcessRoot = statelessProcessRoot{}

StatelessProcessRoot is an implementation of ProcessRoot for processes that do not require any domains-specific state.

StatelessProcessBehavior provides a partial implementation of ProcessMessageHandler that returns this value.

Engines MAY use this value as a sentinel to provide an optimized code path when no state is required.

type ProcessRoute added in v0.12.0

type ProcessRoute interface {
	Route
	// contains filtered or unexported methods
}

ProcessRoute describes a message type that's routed to or from a ProcessMessageHandler.

type ProcessTimeoutScope

type ProcessTimeoutScope interface {
	// InstanceID returns the ID of the process instance.
	InstanceID() string

	// End signals the end of the process.
	//
	// Ending a process instance destroys its state and cancels any pending
	// timeouts.
	//
	// The process instance ends once HandleTimeout() returns. Any future call
	// to ExecuteCommand() or ScheduleTimeout() on this scope prevents the
	// process from ending.
	//
	// "Re-beginning" a process instance that has ended has undefined behavior
	// and is NOT RECOMMENDED.
	End()

	// ExecuteCommand executes a command as a result of the timeout.
	//
	// Executing a command cancels any prior call to End() on this scope.
	ExecuteCommand(Command)

	// ScheduleTimeout schedules a timeout to occur at a specific time.
	//
	// Ending the process cancels any pending timeouts. Scheduling a timeout
	// cancels any prior call to End() on this scope.
	ScheduleTimeout(Timeout, time.Time)

	// ScheduledFor returns the time at which the timeout occured.
	//
	// The time may be before the current time. For example, the engine may
	// deliver timeouts that were "missed" after recovering from downtime.
	ScheduledFor() time.Time

	// Log records an informational message.
	Log(format string, args ...any)
}

ProcessTimeoutScope performs engine operations within the context of a call to the HandleTimeout() method of a ProcessMessageHandler.

type ProjectionCompactScope added in v0.10.0

type ProjectionCompactScope interface {
	// Now returns the current engine time.
	//
	// The handler SHOULD use the returned time to implement compaction logic
	// that has some time-based component, such as removing data older than a
	// certain age.
	//
	// Under normal operating conditions the engine SHOULD return the current
	// local time. The engine MAY return a different time under some
	// circumstances, such as when executing tests.
	Now() time.Time

	// Log records an informational message.
	Log(format string, args ...any)
}

ProjectionCompactScope performs engine operations within the context of a call to the Compact() method of a ProjectionMessageHandler.

type ProjectionConfigurer

type ProjectionConfigurer interface {
	// Identity configures the handler's identity.
	//
	// n is a short human-readable name. It MUST be unique within the
	// application at any given time, but MAY change over the handler's
	// lifetime. It MUST contain solely printable, non-space UTF-8 characters.
	// It must be between 1 and 255 bytes (not characters) in length.
	//
	// k is a unique key used to associate engine state with the handler. The
	// key SHOULD NOT change over the handler's lifetime. k MUST be an RFC 4122
	// UUID, such as "5195fe85-eb3f-4121-84b0-be72cbc5722f".
	//
	// Use of hard-coded literals for both values is RECOMMENDED.
	Identity(n string, k string)

	// Routes configures the engine to route certain message types to and from
	// the handler.
	//
	// Projection handlers support the HandlesEvent() route type.
	Routes(...ProjectionRoute)

	// DeliveryPolicy configures how the engine delivers events to the handler.
	//
	// The default policy is UnicastProjectionDeliveryPolicy.
	DeliveryPolicy(ProjectionDeliveryPolicy)
}

A ProjectionConfigurer configures the engine for use with a specific projection message handler.

type ProjectionDeliveryPolicy added in v0.12.0

type ProjectionDeliveryPolicy interface {
	// contains filtered or unexported methods
}

A ProjectionDeliveryPolicy describes how to deliver events to a projection message handler on engines that support concurrent or distributed execution of a single Dogma application.

type ProjectionEventScope

type ProjectionEventScope interface {
	// RecordedAt returns the time at which the event occurred.
	RecordedAt() time.Time

	// IsPrimaryDelivery returns true on one of the application instances that
	// receive the event, and false on all other instances.
	//
	// This method is useful when the projection must perform some specific
	// operation once per event, such as updating a shared resource that's used
	// by all applications, while still delivering the event to all instances of
	// the application.
	IsPrimaryDelivery() bool

	// Log records an informational message.
	Log(format string, args ...any)
}

ProjectionEventScope performs engine operations within the context of a call to the HandleEvent() method of a ProjectionMessageHandler.

type ProjectionMessageHandler

type ProjectionMessageHandler interface {
	// Configure describes the handler's configuration to the engine.
	Configure(ProjectionConfigurer)

	// HandleEvent updates the projection to reflect the occurrence of an event.
	//
	// r, c and n are the inputs to the OCC store.
	//
	//   - r is a key that identifies some engine-defined resource
	//   - c is engine's perception of the current version of r
	//   - n is the next version of r, made by handling this event
	//
	// If c is the current version of r in the OCC store, the method MUST
	// attempt to atomically update the projection and the version of r to be n.
	// On success, ok is true and err is nil.
	//
	// If c is not the current version of r an OCC conflict has occurred. The
	// method MUST return with ok set to false and without updating the
	// projection.
	//
	// r, c and n are engine-defined; the application SHOULD NOT infer any
	// meaning from their content. The "current" version of a new resource is
	// the empty byte-slice. nil and empty slices are interchangeable.
	//
	// The engine MAY provide specific guarantees about the order in which it
	// supplies events to the handler. To maximize portability across engines,
	// the handler SHOULD NOT assume any specific ordering. The engine MAY call
	// this method concurrently from separate goroutines or operating system
	// processes.
	//
	// The implementation SHOULD NOT impose a context deadline. Implement the
	// TimeoutHint() method instead.
	HandleEvent(
		ctx context.Context,
		r, c, n []byte,
		s ProjectionEventScope,
		e Event,
	) (ok bool, err error)

	// ResourceVersion returns the current version of a resource.
	//
	// It returns an empty slice if r is not in the OCC store.
	ResourceVersion(ctx context.Context, r []byte) ([]byte, error)

	// CloseResource informs the handler that the engine has no further use for
	// a resource.
	//
	// If r is present in the OCC store the handler SHOULD remove it.
	CloseResource(ctx context.Context, r []byte) error

	// TimeoutHint returns a suitable duration for handling the given event.
	//
	// The duration SHOULD be as short as possible. If no hint is available it
	// MUST be zero.
	//
	// In this context, "timeout" refers to a deadline, not a timeout message.
	TimeoutHint(Message) time.Duration

	// Compact attempts to reduce the size of the projection.
	//
	// For example, it may delete unused data, or merge overly granular data.
	//
	// The handler SHOULD compact the projection incrementally such that it
	// makes some progress even if the context's deadline expires.
	Compact(context.Context, ProjectionCompactScope) error
}

A ProjectionMessageHandler builds a projection from events.

The term "read-model" is often used interchangeably with "projection".

Projections use an optimistic concurrency control (OCC) protocol to ensure that the engine applies each event to the projection exactly once.

The OCC protocol uses a key/value store that associates engine-defined "resources" with their "version". These are they keys and values, respectively.

The OCC store can be challenging to implement. The [projectionkit] module provides adaptors that implement the OCC protocol using various popular database systems.

[projectionkit]: github.com/dogma/projectionkit

type ProjectionRoute added in v0.12.0

type ProjectionRoute interface {
	Route
	// contains filtered or unexported methods
}

ProjectionRoute describes a message type that's routed to a ProjectionMessageHandler.

type RecordsEventOption added in v0.13.0

type RecordsEventOption struct{}

RecordsEventOption is an option that affects the behavior of the route returned by RecordsEvent.

type RecordsEventRoute added in v0.12.0

type RecordsEventRoute struct{ Type reflect.Type }

RecordsEventRoute describes a route for a handler that records an Event of a specific type.

func RecordsEvent added in v0.12.0

func RecordsEvent[T Event](...RecordsEventOption) RecordsEventRoute

RecordsEvent routes event messages recorded by an AggregateMessageHandler or IntegrationMessageHandler.

It's used as an argument to the Routes() method of AggregateConfigurer or IntegrationConfigurer.

An application MUST NOT route a single event type from more than one handler.

type RegisterAggregateOption added in v0.13.0

type RegisterAggregateOption struct{}

RegisterAggregateOption is an option that affects the behavior of a call to the RegisterAggregate() method of the ApplicationConfigurer interface.

type RegisterIntegrationOption added in v0.13.0

type RegisterIntegrationOption struct{}

RegisterIntegrationOption is an option that affects the behavior of a call to the RegisterIntegration() method of the ApplicationConfigurer interface.

type RegisterProcessOption added in v0.13.0

type RegisterProcessOption struct{}

RegisterProcessOption is an option that affects the behavior of a call to the RegisterProcess() method of the ApplicationConfigurer interface.

type RegisterProjectionOption added in v0.13.0

type RegisterProjectionOption struct{}

RegisterProjectionOption is an option that affects the behavior of a call to the RegisterProjection() method of the ApplicationConfigurer interface.

type Route added in v0.12.0

type Route interface {
	// contains filtered or unexported methods
}

Route is an interface implemented by all route types.

type SchedulesTimeoutOption added in v0.13.0

type SchedulesTimeoutOption struct{}

SchedulesTimeoutOption is an option that affects the behavior of the route returned by SchedulesTimeout.

type SchedulesTimeoutRoute added in v0.12.0

type SchedulesTimeoutRoute struct{ Type reflect.Type }

SchedulesTimeoutRoute describes a route for a handler that schedules a Timeout of a specific type.

func SchedulesTimeout added in v0.12.0

SchedulesTimeout routes timeout messages scheduled by ProcessMessageHandler.

It's used as an argument to the Routes() method of ProcessConfigurer.

An application MAY use a single timeout type with more than one process.

type StatelessProcessBehavior

type StatelessProcessBehavior struct{}

StatelessProcessBehavior is an embeddable type for ProcessMessageHandler that do not have any domain-specific state.

func (StatelessProcessBehavior) New

New returns StatelessProcessRoot.

type Timeout added in v0.12.0

type Timeout = Message

A Timeout is a message that represents a request for an action to be performed at a specific time.

type UnicastProjectionDeliveryPolicy added in v0.12.0

type UnicastProjectionDeliveryPolicy struct{}

UnicastProjectionDeliveryPolicy is the default ProjectionDeliveryPolicy. It delivers each event to a single instance of the application.

Directories

Path Synopsis
Package fixtures is a set of test fixtures and mocks of the various Dogma interfaces.
Package fixtures is a set of test fixtures and mocks of the various Dogma interfaces.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL