cqrs

package
v1.3.5 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Sep 24, 2023 License: MIT Imports: 11 Imported by: 39

Documentation

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

func CtxWithOriginalMessage added in v1.3.5

func CtxWithOriginalMessage(ctx context.Context, msg *message.Message) context.Context

CtxWithOriginalMessage returns a new context with the original message attached.

func FullyQualifiedStructName added in v1.0.0

func FullyQualifiedStructName(v interface{}) string

FullyQualifiedStructName name returns object name in format [package].[type name]. It ignores if the value is a pointer or not.

func NamedStruct added in v1.0.0

func NamedStruct(fallback func(v interface{}) string) func(v interface{}) string

NamedStruct returns the name from a message implementing the following interface:

type namedStruct interface {
	Name() string
}

It ignores if the value is a pointer or not.

func OriginalMessageFromCtx added in v1.3.5

func OriginalMessageFromCtx(ctx context.Context) *message.Message

OriginalMessageFromCtx returns the original message that was received by the event/command handler.

func StructName added in v1.0.0

func StructName(v interface{}) string

StructName name returns struct name in format [type name]. It ignores if the value is a pointer or not.

Types

type CommandBus

type CommandBus struct {
	// contains filtered or unexported fields
}

CommandBus transports commands to command handlers.

func NewCommandBus

func NewCommandBus(
	publisher message.Publisher,
	generateTopic func(commandName string) string,
	marshaler CommandEventMarshaler,
) (*CommandBus, error)

NewCommandBus creates a new CommandBus. Deprecated: use NewCommandBusWithConfig instead.

func NewCommandBusWithConfig added in v1.3.0

func NewCommandBusWithConfig(publisher message.Publisher, config CommandBusConfig) (*CommandBus, error)

NewCommandBusWithConfig creates a new CommandBus.

func (CommandBus) Send

func (c CommandBus) Send(ctx context.Context, cmd any) error

Send sends command to the command bus.

func (CommandBus) SendWithModifiedMessage added in v1.3.5

func (c CommandBus) SendWithModifiedMessage(ctx context.Context, cmd any, modify func(*message.Message) error) error

type CommandBusConfig added in v1.3.0

type CommandBusConfig struct {
	// GeneratePublishTopic is used to generate topic for publishing command.
	GeneratePublishTopic CommandBusGeneratePublishTopicFn

	// OnSend is called before publishing the command.
	// The *message.Message can be modified.
	//
	// This option is not required.
	OnSend CommandBusOnSendFn

	// Marshaler is used to marshal and unmarshal commands.
	// It is required.
	Marshaler CommandEventMarshaler

	// Logger instance used to log.
	// If not provided, watermill.NopLogger is used.
	Logger watermill.LoggerAdapter
}

func (CommandBusConfig) Validate added in v1.3.0

func (c CommandBusConfig) Validate() error

type CommandBusGeneratePublishTopicFn added in v1.3.0

type CommandBusGeneratePublishTopicFn func(CommandBusGeneratePublishTopicParams) (string, error)

type CommandBusGeneratePublishTopicParams added in v1.3.0

type CommandBusGeneratePublishTopicParams struct {
	CommandName string
	Command     any
}

type CommandBusOnSendFn added in v1.3.0

type CommandBusOnSendFn func(params CommandBusOnSendParams) error

type CommandBusOnSendParams added in v1.3.0

type CommandBusOnSendParams struct {
	CommandName string
	Command     any

	// Message is never nil and can be modified.
	Message *message.Message
}

type CommandEventMarshaler

type CommandEventMarshaler interface {
	// Marshal marshals Command or Event to Watermill's message.
	Marshal(v interface{}) (*message.Message, error)

	// Unmarshal unmarshals watermill's message to v Command or Event.
	Unmarshal(msg *message.Message, v interface{}) (err error)

	// Name returns the name of Command or Event.
	// Name is used to determine, that received command or event is event which we want to handle.
	Name(v interface{}) string

	// NameFromMessage returns the name of Command or Event from Watermill's message (generated by Marshal).
	//
	// When we have Command or Event marshaled to Watermill's message,
	// we should use NameFromMessage instead of Name to avoid unnecessary unmarshaling.
	NameFromMessage(msg *message.Message) string
}

CommandEventMarshaler marshals Commands and Events to Watermill's messages and vice versa. Payload of the command needs to be marshaled to []bytes.

type CommandHandler

type CommandHandler interface {
	// HandlerName is the name used in message.Router while creating handler.
	//
	// It will be also passed to CommandsSubscriberConstructor.
	// May be useful, for example, to create a consumer group per each handler.
	//
	// WARNING: If HandlerName was changed and is used for generating consumer groups,
	// it may result with **reconsuming all messages**!
	HandlerName() string

	NewCommand() any

	Handle(ctx context.Context, cmd any) error
}

CommandHandler receives a command defined by NewCommand and handles it with the Handle method. If using DDD, CommandHandler may modify and persist the aggregate.

In contrast to EventHandler, every Command must have only one CommandHandler.

One instance of CommandHandler is used during handling messages. When multiple commands are delivered at the same time, Handle method can be executed multiple times at the same time. Because of that, Handle method needs to be thread safe!

func NewCommandHandler added in v1.3.0

func NewCommandHandler[Command any](
	handlerName string,
	handleFunc func(ctx context.Context, cmd *Command) error,
) CommandHandler

NewCommandHandler creates a new CommandHandler implementation based on provided function and command type inferred from function argument.

type CommandProcessor

type CommandProcessor struct {
	// contains filtered or unexported fields
}

CommandProcessor determines which CommandHandler should handle the command received from the command bus.

func NewCommandProcessor

func NewCommandProcessor(
	handlers []CommandHandler,
	generateTopic func(commandName string) string,
	subscriberConstructor CommandsSubscriberConstructor,
	marshaler CommandEventMarshaler,
	logger watermill.LoggerAdapter,
) (*CommandProcessor, error)

NewCommandProcessor creates a new CommandProcessor. Deprecated. Use NewCommandProcessorWithConfig instead.

func NewCommandProcessorWithConfig added in v1.3.0

func NewCommandProcessorWithConfig(router *message.Router, config CommandProcessorConfig) (*CommandProcessor, error)

func (*CommandProcessor) AddHandlers added in v1.3.0

func (p *CommandProcessor) AddHandlers(handlers ...CommandHandler) error

AddHandlers adds a new CommandHandler to the CommandProcessor and adds it to the router.

func (CommandProcessor) AddHandlersToRouter

func (p CommandProcessor) AddHandlersToRouter(r *message.Router) error

AddHandlersToRouter adds the CommandProcessor's handlers to the given router. It should be called only once per CommandProcessor instance.

It is required to call AddHandlersToRouter only if command processor is created with NewCommandProcessor (disableRouterAutoAddHandlers is set to true). Deprecated: please migrate to command processor created by NewCommandProcessorWithConfig.

func (CommandProcessor) Handlers

func (p CommandProcessor) Handlers() []CommandHandler

Handlers returns the CommandProcessor's handlers.

type CommandProcessorConfig added in v1.3.0

type CommandProcessorConfig struct {
	// GenerateSubscribeTopic is used to generate topic for subscribing command.
	GenerateSubscribeTopic CommandProcessorGenerateSubscribeTopicFn

	// SubscriberConstructor is used to create subscriber for CommandHandler.
	SubscriberConstructor CommandProcessorSubscriberConstructorFn

	// OnHandle is called before handling command.
	// OnHandle works in a similar way to middlewares: you can inject additional logic before and after handling a command.
	//
	// Because of that, you need to explicitly call params.Handler.Handle() to handle the command.
	//  func(params CommandProcessorOnHandleParams) (err error) {
	//      // logic before handle
	//      //  (...)
	//
	//      err := params.Handler.Handle(params.Message.Context(), params.Command)
	//
	//      // logic after handle
	//      //  (...)
	//
	//      return err
	//  }
	//
	// This option is not required.
	OnHandle CommandProcessorOnHandleFn

	// Marshaler is used to marshal and unmarshal commands.
	// It is required.
	Marshaler CommandEventMarshaler

	// Logger instance used to log.
	// If not provided, watermill.NopLogger is used.
	Logger watermill.LoggerAdapter

	// If true, CommandProcessor will ack messages even if CommandHandler returns an error.
	// If RequestReplyBackend is not null and sending reply fails, the message will be nack-ed anyway.
	//
	// Warning: It's not recommended to use this option when you are using requestreply component
	// (requestreply.NewCommandHandler or requestreply.NewCommandHandlerWithResult), as it may ack the
	// command when sending reply failed.
	//
	// When you are using requestreply, you should use requestreply.PubSubBackendConfig.AckCommandErrors.
	AckCommandHandlingErrors bool
	// contains filtered or unexported fields
}

func (CommandProcessorConfig) Validate added in v1.3.0

func (c CommandProcessorConfig) Validate() error

type CommandProcessorGenerateSubscribeTopicFn added in v1.3.0

type CommandProcessorGenerateSubscribeTopicFn func(CommandProcessorGenerateSubscribeTopicParams) (string, error)

type CommandProcessorGenerateSubscribeTopicParams added in v1.3.0

type CommandProcessorGenerateSubscribeTopicParams struct {
	CommandName    string
	CommandHandler CommandHandler
}

type CommandProcessorOnHandleFn added in v1.3.0

type CommandProcessorOnHandleFn func(params CommandProcessorOnHandleParams) error

type CommandProcessorOnHandleParams added in v1.3.0

type CommandProcessorOnHandleParams struct {
	Handler CommandHandler

	CommandName string
	Command     any

	// Message is never nil and can be modified.
	Message *message.Message
}

type CommandProcessorSubscriberConstructorFn added in v1.3.0

type CommandProcessorSubscriberConstructorFn func(CommandProcessorSubscriberConstructorParams) (message.Subscriber, error)

CommandProcessorSubscriberConstructorFn creates subscriber for CommandHandler. It allows you to create a separate customized Subscriber for every command handler.

type CommandProcessorSubscriberConstructorParams added in v1.3.0

type CommandProcessorSubscriberConstructorParams struct {
	HandlerName string
	Handler     CommandHandler
}

type CommandsSubscriberConstructor deprecated added in v0.4.0

type CommandsSubscriberConstructor func(handlerName string) (message.Subscriber, error)

CommandsSubscriberConstructor creates subscriber for CommandHandler. It allows you to create a separate customized Subscriber for every command handler.

Deprecated: please use CommandProcessorSubscriberConstructorFn instead.

type DuplicateCommandHandlerError added in v0.4.0

type DuplicateCommandHandlerError struct {
	CommandName string
}

DuplicateCommandHandlerError occurs when a handler with the same name already exists.

func (DuplicateCommandHandlerError) Error added in v0.4.0

type EventBus

type EventBus struct {
	// contains filtered or unexported fields
}

EventBus transports events to event handlers.

func NewEventBus

func NewEventBus(
	publisher message.Publisher,
	generateTopic func(eventName string) string,
	marshaler CommandEventMarshaler,
) (*EventBus, error)

NewEventBus creates a new CommandBus. Deprecated: use NewEventBusWithConfig instead.

func NewEventBusWithConfig added in v1.3.0

func NewEventBusWithConfig(publisher message.Publisher, config EventBusConfig) (*EventBus, error)

NewEventBusWithConfig creates a new EventBus.

func (EventBus) Publish

func (c EventBus) Publish(ctx context.Context, event any) error

Publish sends event to the event bus.

type EventBusConfig added in v1.3.0

type EventBusConfig struct {
	// GeneratePublishTopic is used to generate topic name for publishing event.
	GeneratePublishTopic GenerateEventPublishTopicFn

	// OnPublish is called before sending the event.
	// The *message.Message can be modified.
	//
	// This option is not required.
	OnPublish OnEventSendFn

	// Marshaler is used to marshal and unmarshal events.
	// It is required.
	Marshaler CommandEventMarshaler

	// Logger instance used to log.
	// If not provided, watermill.NopLogger is used.
	Logger watermill.LoggerAdapter
}

func (EventBusConfig) Validate added in v1.3.0

func (c EventBusConfig) Validate() error

type EventGroupProcessor added in v1.3.0

type EventGroupProcessor struct {
	// contains filtered or unexported fields
}

EventGroupProcessor determines which EventHandler should handle event received from event bus. Compared to EventProcessor, EventGroupProcessor allows to have multiple handlers that share the same subscriber instance.

func NewEventGroupProcessorWithConfig added in v1.3.0

func NewEventGroupProcessorWithConfig(router *message.Router, config EventGroupProcessorConfig) (*EventGroupProcessor, error)

NewEventGroupProcessorWithConfig creates a new EventGroupProcessor.

func (*EventGroupProcessor) AddHandlersGroup added in v1.3.0

func (p *EventGroupProcessor) AddHandlersGroup(groupName string, handlers ...GroupEventHandler) error

AddHandlersGroup adds a new list of GroupEventHandler to the EventGroupProcessor and adds it to the router.

Compared to AddHandlers, AddHandlersGroup allows to have multiple handlers that share the same subscriber instance.

Handlers group needs to be unique within the EventProcessor instance.

Handler group name is used as handler's name in router.

type EventGroupProcessorConfig added in v1.3.0

type EventGroupProcessorConfig struct {
	// GenerateSubscribeTopic is used to generate topic for subscribing to events for handler groups.
	// This option is required for EventProcessor if handler groups are used.
	GenerateSubscribeTopic EventGroupProcessorGenerateSubscribeTopicFn

	// SubscriberConstructor is used to create subscriber for GroupEventHandler.
	// This function is called for every events group once - thanks to that it's possible to have one subscription per group.
	// It's useful, when we are processing events from one stream and we want to do it in order.
	SubscriberConstructor EventGroupProcessorSubscriberConstructorFn

	// OnHandle is called before handling event.
	// OnHandle works in a similar way to middlewares: you can inject additional logic before and after handling a event.
	//
	// Because of that, you need to explicitly call params.Handler.Handle() to handle the event.
	//
	//  func(params EventGroupProcessorOnHandleParams) (err error) {
	//      // logic before handle
	//      //  (...)
	//
	//      err := params.Handler.Handle(params.Message.Context(), params.Event)
	//
	//      // logic after handle
	//      //  (...)
	//
	//      return err
	//  }
	//
	// This option is not required.
	OnHandle EventGroupProcessorOnHandleFn

	// AckOnUnknownEvent is used to decide if message should be acked if event has no handler defined.
	AckOnUnknownEvent bool

	// Marshaler is used to marshal and unmarshal events.
	// It is required.
	Marshaler CommandEventMarshaler

	// Logger instance used to log.
	// If not provided, watermill.NopLogger is used.
	Logger watermill.LoggerAdapter
}

func (EventGroupProcessorConfig) Validate added in v1.3.0

func (c EventGroupProcessorConfig) Validate() error

type EventGroupProcessorGenerateSubscribeTopicFn added in v1.3.0

type EventGroupProcessorGenerateSubscribeTopicFn func(EventGroupProcessorGenerateSubscribeTopicParams) (string, error)

type EventGroupProcessorGenerateSubscribeTopicParams added in v1.3.0

type EventGroupProcessorGenerateSubscribeTopicParams struct {
	EventGroupName     string
	EventGroupHandlers []GroupEventHandler
}

type EventGroupProcessorOnHandleFn added in v1.3.0

type EventGroupProcessorOnHandleFn func(params EventGroupProcessorOnHandleParams) error

type EventGroupProcessorOnHandleParams added in v1.3.0

type EventGroupProcessorOnHandleParams struct {
	GroupName string
	Handler   GroupEventHandler

	Event     any
	EventName string

	// Message is never nil and can be modified.
	Message *message.Message
}

type EventGroupProcessorSubscriberConstructorFn added in v1.3.0

type EventGroupProcessorSubscriberConstructorFn func(EventGroupProcessorSubscriberConstructorParams) (message.Subscriber, error)

type EventGroupProcessorSubscriberConstructorParams added in v1.3.0

type EventGroupProcessorSubscriberConstructorParams struct {
	EventGroupName     string
	EventGroupHandlers []GroupEventHandler
}

type EventHandler

type EventHandler interface {
	// HandlerName is the name used in message.Router while creating handler.
	//
	// It will be also passed to EventsSubscriberConstructor.
	// May be useful, for example, to create a consumer group per each handler.
	//
	// WARNING: If HandlerName was changed and is used for generating consumer groups,
	// it may result with **reconsuming all messages** !!!
	HandlerName() string

	NewEvent() any

	Handle(ctx context.Context, event any) error
}

EventHandler receives events defined by NewEvent and handles them with its Handle method. If using DDD, CommandHandler may modify and persist the aggregate. It can also invoke a process manager, a saga or just build a read model.

In contrast to CommandHandler, every Event can have multiple EventHandlers.

One instance of EventHandler is used during handling messages. When multiple events are delivered at the same time, Handle method can be executed multiple times at the same time. Because of that, Handle method needs to be thread safe!

func NewEventHandler added in v1.3.0

func NewEventHandler[T any](
	handlerName string,
	handleFunc func(ctx context.Context, event *T) error,
) EventHandler

NewEventHandler creates a new EventHandler implementation based on provided function and event type inferred from function argument.

type EventProcessor

type EventProcessor struct {
	// contains filtered or unexported fields
}

EventProcessor determines which EventHandler should handle event received from event bus.

func NewEventProcessor

func NewEventProcessor(
	individualHandlers []EventHandler,
	generateTopic func(eventName string) string,
	subscriberConstructor EventsSubscriberConstructor,
	marshaler CommandEventMarshaler,
	logger watermill.LoggerAdapter,
) (*EventProcessor, error)

NewEventProcessor creates a new EventProcessor. Deprecated. Use NewEventProcessorWithConfig instead.

func NewEventProcessorWithConfig added in v1.3.0

func NewEventProcessorWithConfig(router *message.Router, config EventProcessorConfig) (*EventProcessor, error)

NewEventProcessorWithConfig creates a new EventProcessor.

func (*EventProcessor) AddHandlers added in v1.3.0

func (p *EventProcessor) AddHandlers(handlers ...EventHandler) error

AddHandlers adds a new EventHandler to the EventProcessor and adds it to the router.

func (EventProcessor) AddHandlersToRouter

func (p EventProcessor) AddHandlersToRouter(r *message.Router) error

AddHandlersToRouter adds the EventProcessor's handlers to the given router. It should be called only once per EventProcessor instance.

It is required to call AddHandlersToRouter only if command processor is created with NewEventProcessor (disableRouterAutoAddHandlers is set to true). Deprecated: please migrate to event processor created by NewEventProcessorWithConfig.

func (EventProcessor) Handlers

func (p EventProcessor) Handlers() []EventHandler

type EventProcessorConfig added in v1.3.0

type EventProcessorConfig struct {
	// GenerateSubscribeTopic is used to generate topic for subscribing to events.
	// If event processor is using handler groups, GenerateSubscribeTopic is used instead.
	GenerateSubscribeTopic EventProcessorGenerateSubscribeTopicFn

	// SubscriberConstructor is used to create subscriber for EventHandler.
	//
	// This function is called for every EventHandler instance.
	// If you want to re-use one subscriber for multiple handlers, use GroupEventProcessor instead.
	SubscriberConstructor EventProcessorSubscriberConstructorFn

	// OnHandle is called before handling event.
	// OnHandle works in a similar way to middlewares: you can inject additional logic before and after handling a event.
	//
	// Because of that, you need to explicitly call params.Handler.Handle() to handle the event.
	//
	//  func(params EventProcessorOnHandleParams) (err error) {
	//      // logic before handle
	//      //  (...)
	//
	//      err := params.Handler.Handle(params.Message.Context(), params.Event)
	//
	//      // logic after handle
	//      //  (...)
	//
	//      return err
	//  }
	//
	// This option is not required.
	OnHandle EventProcessorOnHandleFn

	// AckOnUnknownEvent is used to decide if message should be acked if event has no handler defined.
	AckOnUnknownEvent bool

	// Marshaler is used to marshal and unmarshal events.
	// It is required.
	Marshaler CommandEventMarshaler

	// Logger instance used to log.
	// If not provided, watermill.NopLogger is used.
	Logger watermill.LoggerAdapter
	// contains filtered or unexported fields
}

func (EventProcessorConfig) Validate added in v1.3.0

func (c EventProcessorConfig) Validate() error

type EventProcessorGenerateSubscribeTopicFn added in v1.3.0

type EventProcessorGenerateSubscribeTopicFn func(EventProcessorGenerateSubscribeTopicParams) (string, error)

type EventProcessorGenerateSubscribeTopicParams added in v1.3.0

type EventProcessorGenerateSubscribeTopicParams struct {
	EventName    string
	EventHandler EventHandler
}

type EventProcessorOnHandleFn added in v1.3.0

type EventProcessorOnHandleFn func(params EventProcessorOnHandleParams) error

type EventProcessorOnHandleParams added in v1.3.0

type EventProcessorOnHandleParams struct {
	Handler EventHandler

	Event     any
	EventName string

	// Message is never nil and can be modified.
	Message *message.Message
}

type EventProcessorSubscriberConstructorFn added in v1.3.0

type EventProcessorSubscriberConstructorFn func(EventProcessorSubscriberConstructorParams) (message.Subscriber, error)

type EventProcessorSubscriberConstructorParams added in v1.3.0

type EventProcessorSubscriberConstructorParams struct {
	HandlerName  string
	EventHandler EventHandler
}

type EventsSubscriberConstructor added in v0.4.0

type EventsSubscriberConstructor func(handlerName string) (message.Subscriber, error)

EventsSubscriberConstructor creates a subscriber for EventHandler. It allows you to create separated customized Subscriber for every command handler.

When handler groups are used, handler group is passed as handlerName. Deprecated: please use EventProcessorSubscriberConstructorFn instead.

type Facade deprecated

type Facade struct {
	// contains filtered or unexported fields
}

Deprecated: use CommandHandler and EventHandler instead.

Facade is a facade for creating the Command and Event buses and processors. It was created to avoid boilerplate, when using CQRS in the standard way. You can also create buses and processors manually, drawing inspiration from how it's done in NewFacade.

func NewFacade deprecated

func NewFacade(config FacadeConfig) (*Facade, error)

Deprecated: use CommandHandler and EventHandler instead.

func (Facade) CommandBus

func (f Facade) CommandBus() *CommandBus

func (Facade) CommandEventMarshaler

func (f Facade) CommandEventMarshaler() CommandEventMarshaler

func (Facade) EventBus

func (f Facade) EventBus() *EventBus

type FacadeConfig deprecated

type FacadeConfig struct {
	// GenerateCommandsTopic generates topic name based on the command name.
	// Command name is generated by CommandEventMarshaler's Name method.
	//
	// It allows you to use topic per command or one topic for every command.
	GenerateCommandsTopic func(commandName string) string

	// CommandHandlers return command handlers which should be executed.
	CommandHandlers func(commandBus *CommandBus, eventBus *EventBus) []CommandHandler

	// CommandsPublisher is Publisher used to publish commands.
	CommandsPublisher message.Publisher

	// CommandsSubscriberConstructor is constructor for subscribers which will subscribe for messages.
	// It will be called for every command handler.
	// It allows you to create separated customized Subscriber for every command handler.
	CommandsSubscriberConstructor CommandsSubscriberConstructor

	// GenerateEventsTopic generates topic name based on the event name.
	// Event name is generated by CommandEventMarshaler's Name method.
	//
	// It allows you to use topic per command or one topic for every command.
	GenerateEventsTopic func(eventName string) string

	// EventHandlers return event handlers which should be executed.
	EventHandlers func(commandBus *CommandBus, eventBus *EventBus) []EventHandler

	// EventsPublisher is Publisher used to publish commands.
	EventsPublisher message.Publisher

	// EventsSubscriberConstructor is constructor for subscribers which will subscribe for messages.
	// It will be called for every event handler.
	// It allows you to create separated customized Subscriber for every event handler.
	EventsSubscriberConstructor EventsSubscriberConstructor

	// Router is a Watermill router, which will be used to handle events and commands.
	// Router handlers will be automatically generated by AddHandlersToRouter of Command and Event handlers.
	Router *message.Router

	CommandEventMarshaler CommandEventMarshaler

	Logger watermill.LoggerAdapter
}

Deprecated: use CommandProcessor and EventProcessor instead.

func (FacadeConfig) CommandsEnabled

func (c FacadeConfig) CommandsEnabled() bool

func (FacadeConfig) EventsEnabled

func (c FacadeConfig) EventsEnabled() bool

func (FacadeConfig) Validate

func (c FacadeConfig) Validate() error

type GenerateEventPublishTopicFn added in v1.3.0

type GenerateEventPublishTopicFn func(GenerateEventPublishTopicParams) (string, error)

type GenerateEventPublishTopicParams added in v1.3.0

type GenerateEventPublishTopicParams struct {
	EventName string
	Event     any
}

type GroupEventHandler added in v1.3.0

type GroupEventHandler interface {
	NewEvent() interface{}
	Handle(ctx context.Context, event interface{}) error
}

func NewGroupEventHandler added in v1.3.0

func NewGroupEventHandler[T any](handleFunc func(ctx context.Context, event *T) error) GroupEventHandler

NewGroupEventHandler creates a new GroupEventHandler implementation based on provided function and event type inferred from function argument.

type JSONMarshaler

type JSONMarshaler struct {
	NewUUID      func() string
	GenerateName func(v interface{}) string
}

func (JSONMarshaler) Marshal

func (m JSONMarshaler) Marshal(v interface{}) (*message.Message, error)

func (JSONMarshaler) Name

func (m JSONMarshaler) Name(cmdOrEvent interface{}) string

func (JSONMarshaler) NameFromMessage

func (m JSONMarshaler) NameFromMessage(msg *message.Message) string

func (JSONMarshaler) Unmarshal

func (JSONMarshaler) Unmarshal(msg *message.Message, v interface{}) (err error)

type NoProtoMessageError

type NoProtoMessageError struct {
	// contains filtered or unexported fields
}

NoProtoMessageError is returned when the given value does not implement proto.Message.

func (NoProtoMessageError) Error

func (e NoProtoMessageError) Error() string

type NonPointerError

type NonPointerError struct {
	Type reflect.Type
}

func (NonPointerError) Error

func (e NonPointerError) Error() string

type OnEventSendFn added in v1.3.0

type OnEventSendFn func(params OnEventSendParams) error

type OnEventSendParams added in v1.3.0

type OnEventSendParams struct {
	EventName string
	Event     any

	// Message is never nil and can be modified.
	Message *message.Message
}

type ProtobufMarshaler

type ProtobufMarshaler struct {
	NewUUID      func() string
	GenerateName func(v interface{}) string
}

ProtobufMarshaler is the default Protocol Buffers marshaler.

func (ProtobufMarshaler) Marshal

func (m ProtobufMarshaler) Marshal(v interface{}) (*message.Message, error)

Marshal marshals the given protobuf's message into watermill's Message.

func (ProtobufMarshaler) Name

func (m ProtobufMarshaler) Name(cmdOrEvent interface{}) string

Name returns the command or event's name.

func (ProtobufMarshaler) NameFromMessage

func (m ProtobufMarshaler) NameFromMessage(msg *message.Message) string

NameFromMessage returns the metadata name value for a given Message.

func (ProtobufMarshaler) Unmarshal

func (ProtobufMarshaler) Unmarshal(msg *message.Message, v interface{}) (err error)

Unmarshal unmarshals given watermill's Message into protobuf's message.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL