cqrs

package
v1.1.1 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Feb 19, 2020 License: MIT Imports: 10 Imported by: 39

Documentation

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

func FullyQualifiedStructName added in v1.0.0

func FullyQualifiedStructName(v interface{}) string

FullyQualifiedStructName name returns object name in format [package].[type name]. It ignores if the value is a pointer or not.

func NamedStruct added in v1.0.0

func NamedStruct(fallback func(v interface{}) string) func(v interface{}) string

NamedStruct returns the name from a message implementing the following interface:

type namedStruct interface {
	Name() string
}

It ignores if the value is a pointer or not.

func StructName added in v1.0.0

func StructName(v interface{}) string

StructName name returns struct name in format [type name]. It ignores if the value is a pointer or not.

Types

type CommandBus

type CommandBus struct {
	// contains filtered or unexported fields
}

CommandBus transports commands to command handlers.

func NewCommandBus

func NewCommandBus(
	publisher message.Publisher,
	generateTopic func(commandName string) string,
	marshaler CommandEventMarshaler,
) (*CommandBus, error)

func (CommandBus) Send

func (c CommandBus) Send(ctx context.Context, cmd interface{}) error

Send sends command to the command bus.

type CommandEventMarshaler

type CommandEventMarshaler interface {
	// Marshal marshals Command or Event to Watermill's message.
	Marshal(v interface{}) (*message.Message, error)

	// Unmarshal unmarshals watermill's message to v Command or Event.
	Unmarshal(msg *message.Message, v interface{}) (err error)

	// Name returns the name of Command or Event.
	// Name is used to determine, that received command or event is event which we want to handle.
	Name(v interface{}) string

	// NameFromMessage returns the name of Command or Event from Watermill's message (generated by Marshal).
	//
	// When we have Command or Event marshaled to Watermill's message,
	// we should use NameFromMessage instead of Name to avoid unnecessary unmarshaling.
	NameFromMessage(msg *message.Message) string
}

CommandEventMarshaler marshals Commands and Events to Watermill's messages and vice versa. Payload of the command needs to be marshaled to []bytes.

type CommandHandler

type CommandHandler interface {
	// HandlerName is the name used in message.Router while creating handler.
	//
	// It will be also passed to CommandsSubscriberConstructor.
	// May be useful, for example, to create a consumer group per each handler.
	//
	// WARNING: If HandlerName was changed and is used for generating consumer groups,
	// it may result with **reconsuming all messages**!
	HandlerName() string

	NewCommand() interface{}

	Handle(ctx context.Context, cmd interface{}) error
}

CommandHandler receives a command defined by NewCommand and handles it with the Handle method. If using DDD, CommandHandler may modify and persist the aggregate.

In contrast to EvenHandler, every Command must have only one CommandHandler.

type CommandProcessor

type CommandProcessor struct {
	// contains filtered or unexported fields
}

CommandProcessor determines which CommandHandler should handle the command received from the command bus.

func NewCommandProcessor

func NewCommandProcessor(
	handlers []CommandHandler,
	generateTopic func(commandName string) string,
	subscriberConstructor CommandsSubscriberConstructor,
	marshaler CommandEventMarshaler,
	logger watermill.LoggerAdapter,
) (*CommandProcessor, error)

func (CommandProcessor) AddHandlersToRouter

func (p CommandProcessor) AddHandlersToRouter(r *message.Router) error

func (CommandProcessor) Handlers

func (p CommandProcessor) Handlers() []CommandHandler

type CommandsSubscriberConstructor added in v0.4.0

type CommandsSubscriberConstructor func(handlerName string) (message.Subscriber, error)

CommandsSubscriberConstructor creates subscriber for CommandHandler. It allows you to create a separate customized Subscriber for every command handler.

type DuplicateCommandHandlerError added in v0.4.0

type DuplicateCommandHandlerError struct {
	CommandName string
}

func (DuplicateCommandHandlerError) Error added in v0.4.0

type EventBus

type EventBus struct {
	// contains filtered or unexported fields
}

EventBus transports events to event handlers.

func NewEventBus

func NewEventBus(
	publisher message.Publisher,
	generateTopic func(eventName string) string,
	marshaler CommandEventMarshaler,
) (*EventBus, error)

func (EventBus) Publish

func (c EventBus) Publish(ctx context.Context, event interface{}) error

Publish sends event to the event bus.

type EventHandler

type EventHandler interface {
	// HandlerName is the name used in message.Router while creating handler.
	//
	// It will be also passed to EventsSubscriberConstructor.
	// May be useful, for example, to create a consumer group per each handler.
	//
	// WARNING: If HandlerName was changed and is used for generating consumer groups,
	// it may result with **reconsuming all messages** !!!
	HandlerName() string

	NewEvent() interface{}

	Handle(ctx context.Context, event interface{}) error
}

EventHandler receives events defined by NewEvent and handles them with its Handle method. If using DDD, CommandHandler may modify and persist the aggregate. It can also invoke a process manager, a saga or just build a read model.

In contrast to CommandHandler, every Event can have multiple EventHandlers.

type EventProcessor

type EventProcessor struct {
	// contains filtered or unexported fields
}

EventProcessor determines which EventHandler should handle event received from event bus.

func NewEventProcessor

func NewEventProcessor(
	handlers []EventHandler,
	generateTopic func(eventName string) string,
	subscriberConstructor EventsSubscriberConstructor,
	marshaler CommandEventMarshaler,
	logger watermill.LoggerAdapter,
) (*EventProcessor, error)

func (EventProcessor) AddHandlersToRouter

func (p EventProcessor) AddHandlersToRouter(r *message.Router) error

func (EventProcessor) Handlers

func (p EventProcessor) Handlers() []EventHandler

type EventsSubscriberConstructor added in v0.4.0

type EventsSubscriberConstructor func(handlerName string) (message.Subscriber, error)

EventsSubscriberConstructor creates a subscriber for EventHandler. It allows you to create separated customized Subscriber for every command handler.

type Facade

type Facade struct {
	// contains filtered or unexported fields
}

Facade is a facade for creating the Command and Event buses and processors. It was created to avoid boilerplate, when using CQRS in the standard way. You can also create buses and processors manually, drawing inspiration from how it's done in NewFacade.

func NewFacade

func NewFacade(config FacadeConfig) (*Facade, error)

func (Facade) CommandBus

func (f Facade) CommandBus() *CommandBus

func (Facade) CommandEventMarshaler

func (f Facade) CommandEventMarshaler() CommandEventMarshaler

func (Facade) EventBus

func (f Facade) EventBus() *EventBus

type FacadeConfig

type FacadeConfig struct {
	// GenerateCommandsTopic generates topic name based on the command name.
	// Command name is generated by CommandEventMarshaler's Name method.
	//
	// It allows you to use topic per command or one topic for every command. [todo - add to doc]
	GenerateCommandsTopic func(commandName string) string

	// CommandHandlers return command handlers which should be executed.
	CommandHandlers func(commandBus *CommandBus, eventBus *EventBus) []CommandHandler

	// CommandsPublisher is Publisher used to publish commands.
	CommandsPublisher message.Publisher

	// CommandsSubscriberConstructor is constructor for subscribers which will subscribe for messages.
	// It will be called for every command handler.
	// It allows you to create separated customized Subscriber for every command handler.
	CommandsSubscriberConstructor CommandsSubscriberConstructor

	// GenerateEventsTopic generates topic name based on the event name.
	// Event name is generated by CommandEventMarshaler's Name method.
	//
	// It allows you to use topic per command or one topic for every command. [todo - add to doc]
	GenerateEventsTopic func(eventName string) string

	// EventHandlers return event handlers which should be executed.
	EventHandlers func(commandBus *CommandBus, eventBus *EventBus) []EventHandler

	// EventsPublisher is Publisher used to publish commands.
	EventsPublisher message.Publisher

	// EventsSubscriberConstructor is constructor for subscribers which will subscribe for messages.
	// It will be called for every event handler.
	// It allows you to create separated customized Subscriber for every event handler.
	EventsSubscriberConstructor EventsSubscriberConstructor

	// Router is a Watermill router, which will be used to handle events and commands.
	// Router handlers will be automatically generated by AddHandlersToRouter of Command and Event handlers.
	Router *message.Router

	CommandEventMarshaler CommandEventMarshaler

	Logger watermill.LoggerAdapter
}

func (FacadeConfig) CommandsEnabled

func (c FacadeConfig) CommandsEnabled() bool

func (FacadeConfig) EventsEnabled

func (c FacadeConfig) EventsEnabled() bool

func (FacadeConfig) Validate

func (c FacadeConfig) Validate() error

type JSONMarshaler

type JSONMarshaler struct {
	NewUUID      func() string
	GenerateName func(v interface{}) string
}

func (JSONMarshaler) Marshal

func (m JSONMarshaler) Marshal(v interface{}) (*message.Message, error)

func (JSONMarshaler) Name

func (m JSONMarshaler) Name(cmdOrEvent interface{}) string

func (JSONMarshaler) NameFromMessage

func (m JSONMarshaler) NameFromMessage(msg *message.Message) string

func (JSONMarshaler) Unmarshal

func (JSONMarshaler) Unmarshal(msg *message.Message, v interface{}) (err error)

type NoProtoMessageError

type NoProtoMessageError struct {
	// contains filtered or unexported fields
}

func (NoProtoMessageError) Error

func (e NoProtoMessageError) Error() string

type NonPointerError

type NonPointerError struct {
	Type reflect.Type
}

func (NonPointerError) Error

func (e NonPointerError) Error() string

type ProtobufMarshaler

type ProtobufMarshaler struct {
	NewUUID      func() string
	GenerateName func(v interface{}) string
}

func (ProtobufMarshaler) Marshal

func (m ProtobufMarshaler) Marshal(v interface{}) (*message.Message, error)

func (ProtobufMarshaler) Name

func (m ProtobufMarshaler) Name(cmdOrEvent interface{}) string

func (ProtobufMarshaler) NameFromMessage

func (m ProtobufMarshaler) NameFromMessage(msg *message.Message) string

func (ProtobufMarshaler) Unmarshal

func (ProtobufMarshaler) Unmarshal(msg *message.Message, v interface{}) (err error)

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL