Documentation
¶
Index ¶
- Constants
- Variables
- func Command(ctx context.Context, handler CommandHandler, opts ...co.Options)
- func Event(ctx context.Context, fn ManagerEventApplier, opts ...eo.Options) error
- func GetCommand(aggreate, command string) any
- func GetEvent(aggreate, event string) any
- func JetStream(ctx context.Context) (nats.JetStreamContext, bool)
- func MustUnmarshal[T any](data []byte) T
- func Nats(ctx context.Context) (*nats.Conn, bool)
- func Project(ctx context.Context, fn EventApplier, opts ...po.Options) error
- func PublishCommand(ctx context.Context, cmd *gen.CommandEnvelope, payload any) error
- func Query(ctx context.Context, fn Querier, opts ...qo.Options) error
- func RegisterCommand[T any](aggreate, command string)
- func RegisterEvent[T any](aggreate, event string)
- func Replay(ctx context.Context, fn ReplayHandler, opts ...ro.Options) error
- func ReplayAndSubscribe[T EventApplier](ctx context.Context, agg T, opts ...ro.Options) <-chan T
- func Unmarshal[T any](data []byte) (T, error)
- func UnmarshalCommand(c *gen.CommandEnvelope) (any, error)
- func UnmarshalEvent(e *gen.EventEnvelope) (any, error)
- func WithJetStream(ctx context.Context, js nats.JetStreamContext) context.Context
- func WithNats(ctx context.Context, nc *nats.Conn) context.Context
- type CommandHandler
- type CommandProcessor
- type EventApplier
- type ManagerEventApplier
- type Projector
- type Querier
- type ReplayHandler
Constants ¶
const DeliverAll = 0
Variables ¶
var CommandsPrefix = "cmds"
var EventsPrefix = "events"
var QueryPrefix = "query"
Functions ¶
func Command ¶
func Command(ctx context.Context, handler CommandHandler, opts ...co.Options)
Command is the main entry point for processing commands. accepts co.Options to configure the command processor. co.WithSubject - use custom subject instead of default "cmds.aggregate" co.WithAggregate - use custom aggregate name
func GetCommand ¶
GetCommand retrieves a command instance based on the aggregate type and command type. It checks if the aggregate and command are registered, and if so, it calls the corresponding function to create a new instance of the command type. If the aggregate or command is not registered, it returns nil. This function is useful for dynamically handling commands in a type-safe manner.
func GetEvent ¶
GetEvent retrieves an event instance based on the aggregate type and event type. It checks if the aggregate and event are registered, and if so, it calls the corresponding function to create a new instance of the event type. If the aggregate or event is not registered, it returns nil. This function is useful for dynamically handling events in a type-safe manner.
func JetStream ¶
func JetStream(ctx context.Context) (nats.JetStreamContext, bool)
JetStream retrieves the JetStream context from the context.
func MustUnmarshal ¶ added in v0.0.5
MustUnmarshal unmarshals JSON data into a struct of type T. It panics if unmarshaling fails, so it should be used when you are sure that the data is valid and will not cause an error. This is useful for tests or when you want to ensure that the data is always valid.
func Project ¶
Project subscribes to events for a specific aggregate and applies them using the provided EventApplier function. It uses JetStream to manage the event stream and durable subscriptions. The function takes a context, an EventApplier function, and optional configuration options. The configuration options allow customization of the aggregate type, aggregate ID, subject, durable name, and prefix for the subscription. po.WithSubject sets the subject for the subscription po.WithAggreate sets the aggregate type for the subscription po.WithAggrateID sets the aggregate ID for the subscription po.WithPrefix sets a prefix for the durable name po.WithDurable sets the durable name for the subscription
func PublishCommand ¶
PublishCommand publishes a command to the JetStream server. It takes a context, a CommandEnvelope, and an optional payload. If the payload is not nil, it marshals the payload into JSON and sets it in the CommandEnvelope. It retrieves the JetStream context from the context and publishes the command to the subject "cmds.<AggregateType>" with the serialized CommandEnvelope as the message body. If the JetStream context is not initialized, it returns an error. The function returns an error if the publish operation fails.
func Query ¶
Querier is a function type that takes a QueryEnvelope and returns a result or an error. qo.Subnect - use custom subject instead of default "query.aggregate.get" qo.Aggregate - use custom aggregate
func RegisterCommand ¶
RegisterCommand registers a command type for a specific aggregate. The type T should be a struct that represents the command. It creates a function that returns a new instance of T when called. This allows for dynamic creation of command instances based on the aggregate and command type.
func RegisterEvent ¶
RegisterEvent registers an event type for a specific aggregate. The type T should be a struct that represents the event. It creates a function that returns a new instance of T when called. This allows for dynamic creation of event instances based on the aggregate and event type.
func Replay ¶
Replay replays events for a given aggregate and aggregate ID. accepts ro.Options to configure the replay behavior. ro.WithAggregate ro.WithAggregateID - configure the aggregate and aggregate ID ro.WithSubject - use custom subject instead of default "events.aggregate.aggregateID.>" ro.WithStartSeq - start from event (if you have snapshot) ro.WtihParent - nests subjects ro.WithTimeout - timeout if no events for stream
func ReplayAndSubscribe ¶
func ReplayAndSubscribe[T EventApplier](ctx context.Context, agg T, opts ...ro.Options) <-chan T
ReplayAndSubscribe replays events for a given aggregate and aggregate ID, and subscribes to new events. It accepts ro.Options to configure the replay behavior. ro.WithAggregate ro.WithAggregateID - configure the aggregate and aggregate ID ro.WithSubject - use custom subject instead of default "events.aggregate.aggregateID.>" ro.WithStartSeq - start from event (if you have snapshot) ro.WtihParent - nests subjects ro.WithTimeout - timeout if no events for stream
func UnmarshalCommand ¶
func UnmarshalCommand(c *gen.CommandEnvelope) (any, error)
UnmarshalCommand unmarshals a JSON payload into a command instance. It takes a CommandEnvelope, checks if it has valid command type and aggregate type, and retrieves the corresponding command instance using GetCommand. If the command type or aggregate type is not registered, it returns nil. If the command instance is found, it unmarshals the JSON payload into that instance. If unmarshaling fails, it returns an error. This function is useful for converting raw command data into structured command types.
func UnmarshalEvent ¶
func UnmarshalEvent(e *gen.EventEnvelope) (any, error)
UnmarshalEvent unmarshals a JSON payload into an event instance. It takes an EventEnvelope, checks if it has valid event type and aggregate type, and retrieves the corresponding event instance using GetEvent. If the event type or aggregate type is not registered, it returns nil. If the event instance is found, it unmarshals the JSON payload into that instance. If unmarshaling fails, it returns an error. This function is useful for converting raw event data into structured event types.
func WithJetStream ¶
WithJetStream adds a JetStream context to the context.
Types ¶
type CommandHandler ¶
type CommandHandler interface {
Handle(m *gen.CommandEnvelope) ([]*gen.EventEnvelope, error)
}
type CommandProcessor ¶
type CommandProcessor struct {
// contains filtered or unexported fields
}
type EventApplier ¶
type EventApplier interface {
ApplyEvent(event *gen.EventEnvelope) error
}
type ManagerEventApplier ¶ added in v0.0.14
type ManagerEventApplier interface {
Handle(event *gen.EventEnvelope) ([]*gen.CommandEnvelope, error)
}
type Projector ¶
type Projector interface {
EventApplier
Querier
}
type Querier ¶
type Querier interface {
Query(query *gen.QueryEnvelope) (interface{}, error)
}
type ReplayHandler ¶
type ReplayHandler interface {
ApplyEvent(m *gen.EventEnvelope) error
}
interface
Source Files
¶
Directories
¶
| Path | Synopsis |
|---|---|
|
examples
|
|
|
natserver
command
|
|
|
publishers
command
|
|
|
query
command
|
|
|
subscribers
command
|
|