Documentation
¶
Overview ¶
Package types provides all public interfaces and supporting types for the mono-framework.
This package exists to resolve import cycles between mono main package and internal packages. All interfaces, structs, constants, and type definitions that need to be shared between the public API and internal implementations (internal/*) are defined here.
Package Purpose ¶
The mono-framework follows an architecture where:
- mono: Public API entry point for external consumers
- mono/pkg/types: Interface and type definitions (this package)
- mono/internal/*: Implementation details
By separating interfaces into this package, internal packages can implement interfaces defined here without creating import cycles with pkg/mono.
Import Pattern ¶
Internal packages should import this package for interface definitions:
import "github.com/go-monolith/mono/pkg/types"
External consumers should continue using pkg/mono, which re-exports all types from this package for backward compatibility:
import "github.com/go-monolith/mono/pkg/mono"
Contents ¶
This package contains the following type categories:
- Framework types: MonoFramework, MonoFrameworkState, FrameworkHealth, ModuleHealth
- Module types: Module, DependentModule, ServiceProviderModule, and other module interfaces
- EventBus types: EventBus, Subscription, EventStream, Msg, Header, MsgHandler
- Logger types: Logger, LoggerFactory, LogLevel, LogFormat
- Container types: ServiceContainer, handler types, service client interfaces
- Middleware types: MiddlewareModule, MiddlewareChainRunner, lifecycle events
- JetStream types: StreamConfig, ConsumerConfig, policy enums
Zero Dependencies ¶
This package has NO dependencies on internal/* packages. It may only import:
- Standard library packages
- External dependencies (e.g., nats.go/jetstream for JetStream types)
This constraint is critical for preventing import cycles.
Package types contains the core types and interfaces for the mono framework.
Package types provides core interfaces and types for the mono framework.
Index ¶
- Constants
- func FormatServiceType(serviceType ServiceType) string
- type AckPolicy
- type BaseEventDefinition
- type ConfigurationEvent
- type ConfigurationEventType
- type ConsumerConfig
- type DeliverPolicy
- type DependentModule
- type DiscardPolicy
- type EventBus
- type EventBusAwareModule
- type EventBusWithConn
- type EventConsumerEntry
- type EventConsumerHandler
- type EventConsumerModule
- type EventDefinition
- func (e EventDefinition[T]) EventStreamPublish(ctx context.Context, eventBus EventBus, event T, header Header) (MsgPubAck, error)
- func (e EventDefinition[T]) Publish(eventBus EventBus, event T, header Header) error
- func (e EventDefinition[T]) ToBase() BaseEventDefinition
- func (e EventDefinition[T]) Unmarshal(msg *Msg) (T, error)
- func (e EventDefinition[T]) WithMarshaler(m Marshaler) EventDefinition[T]
- func (e EventDefinition[T]) WithUnmarshaler(u Unmarshaler) EventDefinition[T]
- type EventEmitterModule
- type EventRegistry
- type EventStream
- type EventStreamConsumerEntry
- type EventStreamConsumerHandler
- type ExternalStream
- type FetchConfig
- type FrameworkHealth
- type Header
- type HealthCheckableModule
- type HealthStatus
- type LogFormat
- type LogLevel
- type Logger
- type LoggerFactory
- type LoggerOptions
- type Marshaler
- type MiddlewareChainRunner
- type MiddlewareModule
- type Module
- type ModuleHealth
- type ModuleLifecycleEvent
- type ModuleLifecycleEventType
- type MonoFramework
- type MonoFrameworkConfig
- type MonoFrameworkState
- type Msg
- type MsgHandler
- type MsgPubAck
- type NATSOptions
- type OutgoingMessageContext
- type Placement
- type PluginModule
- type QGHP
- type QueueGroupHandler
- type QueueGroupServiceClient
- type RePublish
- type ReplayPolicy
- type RequestReplyHandler
- type RequestReplyServiceClient
- type RetentionPolicy
- type ServiceContainer
- type ServiceEntry
- type ServiceProviderModule
- type ServiceRegistration
- type ServiceType
- type StorageType
- type StoreCompression
- type StreamConfig
- type StreamConsumerConfig
- type StreamConsumerHandler
- type StreamConsumerLimits
- type StreamConsumerServiceClient
- type StreamSource
- type SubjectTransformConfig
- type Subscription
- type TypedEventConsumerHandler
- type TypedEventStreamConsumerHandler
- type TypedQGHP
- type TypedQueueGroupHandler
- type TypedRequestReplyHandler
- type TypedStreamConsumerHandler
- type Unmarshaler
- type UsePluginModule
Examples ¶
Constants ¶
const ( // HeaderError indicates the response contains an error. // Value should be "true" when present. HeaderError = "Mono-Error" // HeaderErrorMessage contains the human-readable error message. HeaderErrorMessage = "Mono-Error-Message" // HeaderErrorType contains the error classification (e.g., "service", "timeout"). // This is optional and helps clients handle errors differently based on type. HeaderErrorType = "Mono-Error-Type" )
Reserved header names for error responses in RequestReply services. These headers are used to propagate handler errors back to the client.
Variables ¶
This section is empty.
Functions ¶
func FormatServiceType ¶
func FormatServiceType(serviceType ServiceType) string
FormatServiceType converts a ServiceType to its lowercase snake_case string representation for display in logs, error messages, and audit trails. Returns "unknown(N)" for unrecognized service types where N is the numeric value.
Types ¶
type AckPolicy ¶
type AckPolicy int
AckPolicy defines the acknowledgement policy for a consumer.
AckPolicy controls how the consumer must acknowledge received messages. Acknowledgements confirm to the server that a message was successfully processed. The policy affects reliability guarantees and processing semantics.
const ( // AckExplicitPolicy requires explicit acknowledgement for every message. // Each message must be individually acknowledged (Ack), negatively acknowledged (Nak), // or terminated (Term). This is the default and recommended policy for reliability. // Use this for critical operations where you need to ensure each message is processed exactly once. // Guarantees: At-least-once delivery (unacked messages are redelivered). // The server tracks which messages have been acked and will not deliver them again. // Recommended for: Payment processing, order management, audit logs, and any operation // where message loss is unacceptable. AckExplicitPolicy AckPolicy = iota // AckAllPolicy acknowledges a message and all messages with lower sequence numbers implicitly. // When you ack message N, the server also marks messages 1..N-1 as acknowledged. // This is more efficient than explicit acking for large batches but less granular. // Use this when processing batches where you want to acknowledge all previous messages // without individual ack calls (reduces ack overhead). // Guarantees: At-least-once delivery for unacked messages. // Example: Process 100 messages, ack only the last one to implicitly ack all 100. // Warning: If processing fails partway through, retried messages may overlap. AckAllPolicy // AckNonePolicy requires no acknowledgement from the consumer. // The server delivers messages but doesn't wait for or track acknowledgements. // This is fire-and-forget mode: messages are considered delivered when sent. // Use this for non-critical, best-effort scenarios where occasional message loss is acceptable. // Guarantees: At-most-once delivery (no retries, no redelivery). // Recommended for: Metrics, analytics, notifications where missed messages don't impact business logic. // Performance: Fastest option - no ack overhead. // Note: Unacknowledged messages may be lost if the server restarts or crashes. AckNonePolicy )
type BaseEventDefinition ¶
type BaseEventDefinition struct {
// ModuleName is the name of the module that emits this event (e.g., "order")
ModuleName string
// Name is the event name (e.g., "OrderCreated")
Name string
// Subject is the NATS subject for this event (e.g., "events.orders.v1.created")
Subject string
// Version is the semantic version of this event (e.g., "v1")
Version string
}
BaseEventDefinition is the non-generic representation of an event definition. This type is used by EventRegistry and other interfaces because Go interface methods cannot have type parameters.
Subject naming convention: events.<domain>.<version>.<event-type> Example: events.orders.v1.created
func (BaseEventDefinition) EventStreamPublishRaw ¶
func (e BaseEventDefinition) EventStreamPublishRaw(ctx context.Context, eventBus EventBus, data []byte, header Header) (MsgPubAck, error)
EventStreamPublishRaw publishes raw bytes to JetStream for guaranteed persistence. Unlike PublishRaw which uses NATS core fire-and-forget publishing, this method persists the message in JetStream and returns a publish acknowledgment.
Parameters:
- ctx: Context for cancellation and timeout control
- eventBus: The EventBus to publish through
- data: The raw bytes to publish
- header: Optional message headers (can be nil)
Returns:
- MsgPubAck: Contains stream name, sequence number, and duplicate detection info
- error: Returns error if EventBus is nil, EventStream unavailable, or publish fails
Example:
data, _ := json.Marshal(orderCreatedEvent)
ack, err := baseDef.EventStreamPublishRaw(ctx, eventBus, data, nil)
if err != nil {
return err
}
fmt.Printf("Published with sequence: %d\n", ack.Sequence())
func (BaseEventDefinition) PublishRaw ¶
func (e BaseEventDefinition) PublishRaw(eventBus EventBus, data []byte, header Header) error
PublishRaw publishes raw bytes to the event bus. For type-safe publishing with automatic marshaling, use EventDefinition[T].Publish() instead.
Example:
data, _ := json.Marshal(orderCreatedEvent) err := baseDef.PublishRaw(eventBus, data, nil)
type ConfigurationEvent ¶
type ConfigurationEvent struct {
// Type identifies the configuration event type
Type ConfigurationEventType
// OptionName is the name of the configuration option being changed
OptionName string
// OldValue is the previous value of the configuration option
OldValue any
// NewValue is the new value of the configuration option
NewValue any
// Metadata holds extensible custom data that middleware can attach
Metadata map[string]any
}
ConfigurationEvent contains data for configuration change events.
Middleware can observe or modify old/new values before they're applied. This is useful for:
- Audit logging of configuration changes
- Validating configuration values
- Redacting sensitive values
type ConfigurationEventType ¶
type ConfigurationEventType string
ConfigurationEventType represents the type of configuration event.
const ( // ConfigurationUpdatedEvent indicates a configuration option was changed. ConfigurationUpdatedEvent ConfigurationEventType = "configuration.updated" )
type ConsumerConfig ¶
type ConsumerConfig struct {
// Name is an optional name for the consumer. If not set, one is
// generated automatically. This is the preferred field for naming consumers.
//
// If both Name and Durable are set, they must be equal.
//
// Name cannot contain whitespace, ., *, >, path separators (forward or
// backwards slash), and non-printable characters.
Name string `json:"name,omitempty"`
// Durable is an optional durable name for the consumer.
//
// Deprecated: Use Name instead. Durable is maintained for backward
// compatibility. If both Durable and Name are set, they must be equal.
// Unless InactiveThreshold is set, a durable consumer will not be
// cleaned up automatically.
//
// Durable cannot contain whitespace, ., *, >, path separators (forward or
// backwards slash), and non-printable characters.
Durable string `json:"durable_name,omitempty"`
// Description provides an optional description of the consumer.
Description string `json:"description,omitempty"`
// DeliverPolicy defines from which point to start delivering messages
// from the stream. Defaults to DeliverAllPolicy.
DeliverPolicy DeliverPolicy `json:"deliver_policy"`
// OptStartSeq is an optional sequence number from which to start
// message delivery. Only applicable when DeliverPolicy is set to
// DeliverByStartSequencePolicy.
OptStartSeq uint64 `json:"opt_start_seq,omitempty"`
// OptStartTime is an optional time from which to start message
// delivery. Only applicable when DeliverPolicy is set to
// DeliverByStartTimePolicy.
OptStartTime *time.Time `json:"opt_start_time,omitempty"`
// AckPolicy defines the acknowledgement policy for the consumer.
// Defaults to AckExplicitPolicy.
AckPolicy AckPolicy `json:"ack_policy"`
// AckWait defines how long the server will wait for an acknowledgement
// before resending a message. If not set, server default is 30 seconds.
AckWait time.Duration `json:"ack_wait,omitempty"`
// MaxDeliver defines the maximum number of delivery attempts for a
// message. Applies to any message that is re-sent due to ack policy.
// If not set, server default is -1 (unlimited).
MaxDeliver int `json:"max_deliver,omitempty"`
// BackOff specifies the optional back-off intervals for retrying
// message delivery after a failed acknowledgement. It overrides
// AckWait.
//
// BackOff only applies to messages not acknowledged in specified time,
// not messages that were nack'ed.
//
// The number of intervals specified must be lower or equal to
// MaxDeliver. If the number of intervals is lower, the last interval is
// used for all remaining attempts.
BackOff []time.Duration `json:"backoff,omitempty"`
// FilterSubject can be used to filter messages delivered from the
// stream. FilterSubject is exclusive with FilterSubjects.
FilterSubject string `json:"filter_subject,omitempty"`
// ReplayPolicy defines the rate at which messages are sent to the
// consumer. If ReplayOriginalPolicy is set, messages are sent in the
// same intervals in which they were stored on stream. This can be used
// e.g. to simulate production traffic in development environments. If
// ReplayInstantPolicy is set, messages are sent as fast as possible.
// Defaults to ReplayInstantPolicy.
ReplayPolicy ReplayPolicy `json:"replay_policy"`
// RateLimit specifies an optional maximum rate of message delivery in
// bits per second.
RateLimit uint64 `json:"rate_limit_bps,omitempty"`
// SampleFrequency is an optional frequency for sampling how often
// acknowledgements are sampled for observability. See
// https://docs.nats.io/running-a-nats-service/nats_admin/monitoring/monitoring_jetstream
SampleFrequency string `json:"sample_freq,omitempty"`
// MaxWaiting is a maximum number of pull requests waiting to be
// fulfilled. If not set, this will inherit settings from stream's
// ConsumerLimits or (if those are not set) from account settings. If
// neither are set, server default is 512.
MaxWaiting int `json:"max_waiting,omitempty"`
// MaxAckPending is a maximum number of outstanding unacknowledged
// messages. Once this limit is reached, the server will suspend sending
// messages to the consumer. If not set, server default is 1000.
// Set to -1 for unlimited.
MaxAckPending int `json:"max_ack_pending,omitempty"`
// HeadersOnly indicates whether only headers of messages should be sent
// (and no payload). Defaults to false.
HeadersOnly bool `json:"headers_only,omitempty"`
// MaxRequestBatch is the optional maximum batch size a single pull
// request can make. When set with MaxRequestMaxBytes, the batch size
// will be constrained by whichever limit is hit first.
MaxRequestBatch int `json:"max_batch,omitempty"`
// MaxRequestExpires is the maximum duration a single pull request will
// wait for messages to be available to pull.
MaxRequestExpires time.Duration `json:"max_expires,omitempty"`
// MaxRequestMaxBytes is the optional maximum total bytes that can be
// requested in a given batch. When set with MaxRequestBatch, the batch
// size will be constrained by whichever limit is hit first.
MaxRequestMaxBytes int `json:"max_bytes,omitempty"`
// InactiveThreshold is a duration which instructs the server to clean
// up the consumer if it has been inactive for the specified duration.
// Durable consumers will not be cleaned up by default, but if
// InactiveThreshold is set, they will be. If not set, this will inherit
// settings from stream's ConsumerLimits. If neither are set, server
// default is 5 seconds.
//
// A consumer is considered inactive there are not pull requests
// received by the server (for pull consumers), or no interest detected
// on deliver subject (for push consumers), not if there are no
// messages to be delivered.
InactiveThreshold time.Duration `json:"inactive_threshold,omitempty"`
// Replicas is the number of replicas for the consumer's state.
// If set to 0, consumers inherit the number of replicas from the stream.
Replicas int `json:"num_replicas"`
// MemoryStorage is a flag to force the consumer to use memory storage
// rather than inherit the storage type from the stream.
MemoryStorage bool `json:"mem_storage,omitempty"`
// FilterSubjects allows filtering messages from a stream by subject.
// This field is exclusive with FilterSubject. Requires nats-server
// v2.10.0 or later.
FilterSubjects []string `json:"filter_subjects,omitempty"`
// Metadata is a set of application-defined key-value pairs for
// associating metadata on the consumer. This feature requires
// nats-server v2.10.0 or later.
Metadata map[string]string `json:"metadata,omitempty"`
}
ConsumerConfig configures a JetStream consumer. This is the framework's internal type that abstracts the underlying JetStream configuration.
type DeliverPolicy ¶
type DeliverPolicy int
DeliverPolicy defines when a consumer should start delivering messages.
DeliverPolicy controls from which point in the stream a consumer begins receiving messages. This determines whether you replay historical messages or start fresh with new messages. The default is DeliverAllPolicy (start from the beginning).
const ( // DeliverAllPolicy starts delivering from the very first message in the stream. // This is the default delivery policy. // Use this to replay the entire history: processing all messages from the beginning. // Recommended for: Initial data synchronization, recovery scenarios, building state from scratch. // Behavior: Consumer receives all messages that have ever been published, in order. // Warning: For large streams with millions of messages, this may be slow on first delivery. DeliverAllPolicy DeliverPolicy = iota // DeliverLastPolicy starts the consumer with only the last message in the stream. // Use this when you only care about the most recent state and don't need history. // Recommended for: Status updates, snapshots, current state consumption. // Behavior: Consumer skips all historical messages and only receives new messages from here on. // Useful for: Position tracking, latest configuration, current metrics. DeliverLastPolicy // DeliverNewPolicy only delivers messages sent after the consumer is created. // Use this for new consumers that should only see future messages, not historical data. // Recommended for: New subscribers joining a live stream, real-time notifications. // Behavior: All messages published before consumer creation are ignored. // Useful for: Alert systems, live feeds, forward-only subscribers. DeliverNewPolicy // DeliverByStartSequencePolicy starts delivering from a specific sequence number. // You must set ConsumerConfig.OptStartSeq to specify the starting sequence. // Use this when you know the exact sequence number to start from (e.g., resuming after a crash). // Recommended for: Recovery with known position, resumable processing. // Behavior: Starts at sequence N and delivers all messages from there onward. // Example: Resume a crashed consumer from where it last processed. DeliverByStartSequencePolicy // DeliverByStartTimePolicy starts delivering from messages created after a specific timestamp. // You must set ConsumerConfig.OptStartTime to specify the starting time. // Use this when you want to replay messages from a certain point in time. // Recommended for: Time-based recovery, re-processing messages from last hour/day. // Behavior: Starts at the first message created after the specified time. // Example: Replay all order events from the last 24 hours. DeliverByStartTimePolicy // DeliverLastPerSubjectPolicy starts the consumer with the last message for each subject. // Use this for state machines where you only need the latest update for each subject. // Recommended for: Key-value patterns, state snapshots with multiple subjects. // Behavior: For each unique subject, consumer receives only the most recent message. // Useful for: Loading current values of all keys, building initial state from subjects. // Example: Get the latest status for all 100 machines in your cluster. DeliverLastPerSubjectPolicy )
type DependentModule ¶
type DependentModule interface {
Module
// Dependencies returns names of modules this module depends on.
// The returned slice must contain valid module names that will be registered.
// Dependencies are resolved during framework startup, and missing or circular
// dependencies will cause startup to fail.
//
// Dependencies are started before this module and stopped after this module.
Dependencies() []string
// SetDependencyServiceContainer provides access to the service container of a dependency.
// Called once for each dependency during module initialization, before RegisterServices.
//
// Parameters:
// - dependency: The name of the dependency module (matches a value from Dependencies())
// - container: The ServiceContainer of the dependency module
//
// Modules should save the container to an appropriately named field for later use
// during Start() or when handling requests.
SetDependencyServiceContainer(dependency string, container ServiceContainer)
}
DependentModule declares dependencies on other modules and receives their service containers.
Modules implementing this interface will be started after their dependencies and stopped before their dependencies. The framework uses this information for topological sorting to determine the correct startup order.
Dependencies are specified by module name and must match the Name() of other registered modules. Circular dependencies are detected and will cause registration to fail.
The framework calls SetDependencyServiceContainer for each dependency before calling RegisterServices or Start, allowing modules to access services provided by their dependencies.
Example:
type OrderModule struct {
inventory ServiceContainer // Set via SetDependencyServiceContainer
payment ServiceContainer // Set via SetDependencyServiceContainer
}
func (m *OrderModule) Name() string { return "order" }
func (m *OrderModule) Dependencies() []string {
return []string{"inventory", "payment"}
}
func (m *OrderModule) SetDependencyServiceContainer(dep string, container ServiceContainer) {
switch dep {
case "inventory":
m.inventory = container
case "payment":
m.payment = container
}
}
type DiscardPolicy ¶
type DiscardPolicy int
DiscardPolicy determines how to proceed when limits of messages or bytes are reached.
DiscardPolicy controls what happens when a stream reaches its size or message count limits. You must configure limits (MaxMsgs, MaxBytes, or MaxAge) for the discard policy to take effect. The default is DiscardOld (remove old messages to stay within limits).
const ( // DiscardOld removes the oldest messages when stream limits are reached. // This is the default discard policy. // Use this for: Rolling windows, time-series data, circular buffers where you always keep the most recent data. // Behavior: When MaxMsgs or MaxBytes is exceeded, the oldest message(s) are automatically deleted. // Example: Keep only the last 1 million messages (when MaxMsgs=1M). // Recommended for: Event streams, audit logs with rolling retention, dashboards with fixed windows. // Guarantees: Older messages are lost first; newer messages are always kept (if possible). // Good for: Use when you have a rolling window of data and older data becomes irrelevant. DiscardOld DiscardPolicy = iota // DiscardNew fails to store new messages once stream limits are reached. // New published messages are rejected when the stream is full. // Use this for: Quota enforcement, preventing unbounded growth, write-once archives. // Behavior: When MaxMsgs or MaxBytes is reached, all new Publish calls fail until space is freed. // Recommended for: Strict quota systems, write-protected archives, capacity planning testing. // Guarantees: All existing messages are preserved; new messages may be rejected. // Warning: Publishers will receive errors; you must handle publish failures. // Good for: Immutable ledgers, write-once backups, enforcing hard limits on data growth. DiscardNew )
type EventBus ¶
type EventBus interface {
// Publish publishes data to a subject (fire-and-forget)
Publish(subject string, data []byte) error
// PublishMsg publishes a complete message with headers
PublishMsg(msg *Msg) error
// Request sends a request and waits for a single reply
Request(subject string, data []byte, timeout time.Duration) (*Msg, error)
// RequestWithContext sends a request and waits for a single reply with context support.
// The context controls cancellation and deadline. If the context is cancelled or
// its deadline is exceeded, the request is aborted and an appropriate error is returned.
// This method is preferred over Request when context cancellation is needed.
RequestWithContext(ctx context.Context, subject string, data []byte) (*Msg, error)
// RequestMsgWithContext sends a complete message with headers and waits for a single reply.
// The context controls cancellation and deadline. If the context is cancelled or
// its deadline is exceeded, the request is aborted and an appropriate error is returned.
// This method allows transmitting headers along with the request payload.
// Note: The Reply field is ignored; NATS generates the reply subject automatically.
RequestMsgWithContext(ctx context.Context, msg *Msg) (*Msg, error)
// Subscribe creates an asynchronous subscription with a message handler
Subscribe(subject string, handler MsgHandler) (Subscription, error)
// SubscribeSync creates a synchronous subscription
SubscribeSync(subject string) (Subscription, error)
// QueueSubscribe creates a queue group subscription for load balancing
QueueSubscribe(subject, queue string, handler MsgHandler) (Subscription, error)
// QueueSubscribeSync creates a synchronous queue group subscription
QueueSubscribeSync(subject, queue string) (Subscription, error)
// ChanSubscribe creates a channel-based subscription
ChanSubscribe(subject string, ch chan *Msg) (Subscription, error)
// EventStream returns an EventStream interface instance for durable/persistent subscriptions
EventStream() (EventStream, error)
// SetRuntimeContext sets the context that will be passed to all message handlers.
// When this context is cancelled, handlers can detect shutdown and terminate gracefully.
SetRuntimeContext(ctx context.Context)
}
EventBus provides event-driven communication wrapping NATS client.
The EventBus abstraction provides high-level messaging patterns including publish/subscribe, request/reply, and queue group subscriptions.
See docs/spec/foundation.md for detailed design documentation.
type EventBusAwareModule ¶
type EventBusAwareModule interface {
Module
// SetEventBus provides the event bus for low-level pub/sub, queue, jetstream operations.
// Called during framework initialization before RegisterServices and Start.
//
// Modules should store the EventBus in a field for later use. The EventBus can be
// used to:
// - Publish events: bus.Publish(subject, data)
// - Subscribe to events: bus.Subscribe(subject, handler)
// - Use queue groups: bus.QueueSubscribe(subject, queue, handler)
// - Make request-reply calls: bus.Request(subject, data, timeout)
SetEventBus(bus EventBus)
}
EventBusAwareModule receives an EventBus instance during framework initialization
Modules implementing this interface gain access to the EventBus for low-level pub/sub messaging, queue subscriptions, and JetStream operations. This is useful for modules that need to:
- Publish domain events to other modules
- Subscribe to events from other modules
- Use queue groups for load balancing
- Use JetStream for durable messaging
SetEventBus is called during framework initialization, after NATS server is started but before RegisterServices.
Example:
type NotificationModule struct {
eventBus EventBus
}
func (m *NotificationModule) SetEventBus(bus EventBus) {
m.eventBus = bus
}
func (m *NotificationModule) Start(ctx context.Context) error {
// Subscribe to order events
_, err := m.eventBus.Subscribe("events.order.created", func(msg *Msg) {
// Send notification
})
return err
}
type EventBusWithConn ¶
type EventBusWithConn[T any] interface { Conn() T }
EventBusWithConn is a generic interface for accessing the underlying connection from an EventBus implementation.
This interface provides an escape hatch for plugins and advanced modules that need direct access to the underlying event bus driver (e.g., for JetStream operations, custom NATS features not exposed through EventBus).
IMPORTANT: Most modules should NOT use this interface. Standard module communication should use EventBus methods (Publish, Request, Subscribe) or services registered through ServiceContainer. Only use this for:
- Plugin modules needing JetStream/ObjectStore/KV access
- Advanced features requiring driver-specific APIs
- Integration with third-party NATS libraries
Example usage:
if provider, ok := eventBus.(types.EventBusWithConn[*nats.Conn]); ok {
conn := provider.Conn()
// Use NATS connection for JetStream operations
js, err := jetstream.New(conn)
}
type EventConsumerEntry ¶
type EventConsumerEntry struct {
// EventDef is the event definition being consumed
EventDef BaseEventDefinition
// Handler is the consumer handler function
Handler EventConsumerHandler
// Module is the module that registered this consumer
Module Module
// QueueGroup is the NATS queue group name for load balancing.
// Defaults to the consumer module's name if not explicitly provided.
// Multiple consumers in the same queue group will share event processing (load balancing).
QueueGroup string
}
EventConsumerEntry tracks an event consumer registration. The framework collects these entries from the EventRegistry during setupNATSSubscriptions and creates the actual NATS subscriptions.
type EventConsumerHandler ¶
EventConsumerHandler is the signature for event consumer handlers. It follows the same pattern as QueueGroupHandler for consistency.
The handler receives the runtime context and the message containing the event data. For type-safe handlers with automatic unmarshaling, use TypedEventConsumerHandler[T] with RegisterTypedConsumer instead.
Example:
handler := func(ctx context.Context, msg *Msg) error {
var event OrderCreatedEvent
if err := json.Unmarshal(msg.Data, &event); err != nil {
return err
}
// Process event...
return nil
}
type EventConsumerModule ¶
type EventConsumerModule interface {
Module
// RegisterEventConsumers registers event consumers for this module.
// Called after RegisterServices but before Start.
//
// The registry provides event discovery and consumer registration:
// - GetEventByName(name, version, moduleName) - find specific event
// - GetEventsByModule(moduleName) - list all events from a module
// - GetAllEvents() - list all registered events
// - RegisterEventConsumer(eventDef, handler, module) - register handler
//
// If RegisterEventConsumers returns an error, framework initialization will fail
// and no modules will be started.
RegisterEventConsumers(registry EventRegistry) error
}
EventConsumerModule registers event consumers during framework initialization.
Modules implementing this interface can register handlers for events emitted by other modules. This follows the same pattern as ServiceProviderModule - the framework calls the method directly for registration rather than providing a setter.
RegisterEventConsumers is called during framework initialization:
- After SetDependencyServiceContainer (if DependentModule)
- After SetEventBus (if EventBusAwareModule)
- After RegisterServices (if ServiceProviderModule)
- After all EventEmitterModules have registered their events
- Before Start()
This ensures:
- All services are registered before event consumers are set up
- All event definitions are available for discovery
- Consumer registration happens in a dedicated phase
Example:
type NotificationModule struct {
// No need to store eventRegistry as a field
}
func (m *NotificationModule) RegisterEventConsumers(registry EventRegistry) error {
// Discover the event
eventDef, ok := registry.GetEventByName("OrderCreated", "v1", "order")
if !ok {
return fmt.Errorf("event not found: OrderCreated.v1 from order")
}
// Register consumer handler
return registry.RegisterEventConsumer(eventDef, m.handleOrderCreated, m)
}
type EventDefinition ¶
type EventDefinition[T any] struct { BaseEventDefinition // embedded - fields are promoted // contains filtered or unexported fields }
EventDefinition is a generic event definition with type-safe publish and consume support. The type parameter T represents the event payload type.
EventDefinition embeds BaseEventDefinition for field reuse. Fields like ModuleName, Name, Subject, and Version are accessed directly via Go's field promotion.
Subject naming convention: events.<domain>.<version>.<event-type> Example: events.orders.v1.created
Example usage:
// OrderCreatedV1 defines the event for order creation.
var OrderCreatedV1 = helper.EventDefinition[OrderCreatedEvent](
"order", "OrderCreated", "v1",
)
// Type-safe publish:
event := OrderCreatedEvent{OrderID: "123", Amount: 99.99}
err := OrderCreatedV1.Publish(eventBus, event, nil)
func NewEventDefinition ¶
func NewEventDefinition[T any](moduleName, name, version string, subject ...string) EventDefinition[T]
NewEventDefinition creates a new generic EventDefinition with JSON as the default serialization format.
NOTE: This is the internal constructor that does not validate the subject. Use helper.EventDefinition from pkg/mono for the validated version.
Parameters:
- moduleName: The name of the module that emits this event (e.g., "order")
- name: The event name (e.g., "OrderCreated")
- version: The semantic version of this event (e.g., "v1")
- subject: The NATS subject for this event (e.g., "events.orders.v1.created") - optional, variadic
Example:
var OrderCreatedV1 = helper.EventDefinition[OrderCreatedEvent](
"order", // moduleName
"OrderCreated", // name
"v1", // version
"events.orders.v1.created", // subject (optional)
)
Example ¶
ExampleNewEventDefinition demonstrates creating a typed event definition.
package main
import (
"fmt"
"github.com/go-monolith/mono/pkg/types"
)
// sampleEvent is an example event payload for demonstration.
type sampleEvent struct {
OrderID string `json:"order_id"`
Amount float64 `json:"amount"`
}
func main() {
eventDef := types.NewEventDefinition[sampleEvent](
"order", // moduleName
"OrderCreated", // name
"v1", // version
"events.order.v1.order-created", // subject
)
fmt.Println("Event name:", eventDef.Name)
fmt.Println("Event version:", eventDef.Version)
fmt.Println("Event subject:", eventDef.Subject)
}
Output: Event name: OrderCreated Event version: v1 Event subject: events.order.v1.order-created
func (EventDefinition[T]) EventStreamPublish ¶
func (e EventDefinition[T]) EventStreamPublish(ctx context.Context, eventBus EventBus, event T, header Header) (MsgPubAck, error)
EventStreamPublish serializes the event and publishes it to JetStream for guaranteed persistence. Unlike Publish which uses NATS core fire-and-forget publishing, this method persists the message in JetStream and returns a publish acknowledgment.
Parameters:
- ctx: Context for cancellation and timeout control
- eventBus: The EventBus to publish through
- event: The typed event payload to publish
- header: Optional message headers (can be nil)
Returns:
- MsgPubAck: Contains stream name, sequence number, and duplicate detection info
- error: Returns error if marshaling fails, EventBus is nil, or publish fails
Example:
event := OrderCreatedEvent{OrderID: "123", Amount: 99.99}
ack, err := OrderCreatedV1.EventStreamPublish(ctx, eventBus, event, nil)
if err != nil {
return err
}
fmt.Printf("Published with sequence: %d\n", ack.Sequence())
func (EventDefinition[T]) Publish ¶
func (e EventDefinition[T]) Publish(eventBus EventBus, event T, header Header) error
Publish serializes the event and publishes it to the event bus. This is the type-safe method that automatically marshals the event.
Example:
event := OrderCreatedEvent{OrderID: "123", Amount: 99.99}
err := OrderCreatedV1.Publish(eventBus, event, nil)
func (EventDefinition[T]) ToBase ¶
func (e EventDefinition[T]) ToBase() BaseEventDefinition
ToBase returns the embedded BaseEventDefinition for use with the EventRegistry interface.
Example:
func (m *Module) EmitEvents() []mono.BaseEventDefinition {
return []mono.BaseEventDefinition{
OrderCreatedV1.ToBase(),
OrderShippedV1.ToBase(),
}
}
func (EventDefinition[T]) Unmarshal ¶
func (e EventDefinition[T]) Unmarshal(msg *Msg) (T, error)
Unmarshal deserializes the message data into the event type. This is useful in consumer handlers when you have access to the event definition.
Example:
func (m *Module) handleOrderCreated(ctx context.Context, msg *mono.Msg) error {
event, err := order.OrderCreatedV1.Unmarshal(msg)
if err != nil {
return err
}
// Process event...
}
func (EventDefinition[T]) WithMarshaler ¶
func (e EventDefinition[T]) WithMarshaler(m Marshaler) EventDefinition[T]
WithMarshaler sets a custom marshaler for the event definition. Returns a copy with the marshaler set.
Example:
var OrderCreatedV1 = helper.EventDefinition[OrderCreatedEvent](...).
WithMarshaler(protobufMarshal)
func (EventDefinition[T]) WithUnmarshaler ¶
func (e EventDefinition[T]) WithUnmarshaler(u Unmarshaler) EventDefinition[T]
WithUnmarshaler sets a custom unmarshaler for the event definition. Returns a copy with the unmarshaler set.
Example:
var OrderCreatedV1 = helper.EventDefinition[OrderCreatedEvent](...).
WithUnmarshaler(protobufUnmarshal)
type EventEmitterModule ¶
type EventEmitterModule interface {
EventBusAwareModule
// EmitEvents returns all base event definitions this module can emit.
// Called during framework initialization (after SetEventBus, before Start).
// The method name clearly indicates the module's intent: declaring what events it emits.
//
// Use eventDef.ToBase() to convert generic EventDefinition[T] to BaseEventDefinition.
EmitEvents() []BaseEventDefinition
}
EventEmitterModule registers event definitions with the framework.
Modules implementing this interface can declare events they will emit during operation. The framework collects these event definitions during initialization and makes them available for discovery by consumer modules via the EventRegistry. This module require EventBusAwareModule interface in order to publish events to EventBus
EmitEvents is called during framework initialization:
- After SetEventBus (require EventBusAwareModule)
- Before module Start()
- Before consumer modules can register event handlers
This ensures all event definitions are available when consumer modules start.
Example:
type OrderModule struct {
eventBus EventBus
}
// OrderCreatedV1 is emitted when a new order is created.
var OrderCreatedV1 = helper.EventDefinition[OrderCreatedEvent](
"order", "OrderCreated", "v1",
)
func (m *OrderModule) EmitEvents() []BaseEventDefinition {
return []BaseEventDefinition{
OrderCreatedV1.ToBase(),
}
}
type EventRegistry ¶
type EventRegistry interface {
// RegisterEvent registers an event definition (called by EventEmitterModule).
// Returns an error if an event with the same name, version, and module already exists.
RegisterEvent(def BaseEventDefinition) error
// GetEventsByModule returns all events registered by a specific module.
GetEventsByModule(moduleName string) []BaseEventDefinition
// GetEventByName returns the event definition by name, version, and module name.
// Parameter order: name first (what), version second (which version), moduleName third (from where).
// Returns the event definition and true if found, or an empty definition and false if not found.
GetEventByName(name string, version string, moduleName string) (BaseEventDefinition, bool)
// GetAllEvents returns all registered event definitions.
GetAllEvents() []BaseEventDefinition
// RegisterEventConsumer registers a consumer for an event (called by consumer modules during Start).
// This DOES NOT create the NATS subscription - it only registers the intent.
// Framework will later call Entries() and set up NATS subscriptions.
// The queueGroup parameter is optional (variadic). If not provided or empty, defaults to module.Name().
// Multiple consumers with the same queue group will load balance event processing.
// Importance: EventConsumer doesn't detect "no responder" error. There is a risk
// of "loss event" when there is no consumer is available
RegisterEventConsumer(eventDef BaseEventDefinition, handler EventConsumerHandler, module Module, queueGroup ...string) error
// Entries returns all registered event consumers.
// Framework uses this during setupNATSSubscriptions phase.
// Named to be consistent with ServiceContainer.Entries().
Entries() []EventConsumerEntry
// RegisterEventStreamConsumer registers a JetStream durable consumer for an event.
// The config.Stream.Subjects will be overridden with []string{eventDef.Subject}.
// This provides durable, at-least-once delivery for event consumers using JetStream.
//
// Unlike RegisterEventConsumer which uses NATS core pub/sub (fire-and-forget),
// EventStreamConsumer persists messages in JetStream and supports message acknowledgment,
// redelivery on failure, and durable subscriptions.
//
// Example:
//
// config := mono.StreamConsumerConfig{
// Stream: mono.StreamConfig{
// Name: "order-events",
// Retention: mono.WorkQueuePolicy,
// },
// Fetch: mono.FetchConfig{BatchSize: 10},
// }
// err := registry.RegisterEventStreamConsumer(orderCreatedV1.ToBase(), config, handler, m)
RegisterEventStreamConsumer(eventDef BaseEventDefinition, config StreamConsumerConfig, handler EventStreamConsumerHandler, module Module) error
// StreamConsumerEntries returns all registered event stream consumers.
// Framework uses this during setupNATSSubscriptions phase to create JetStream streams and consumers.
StreamConsumerEntries() []EventStreamConsumerEntry
// SetMiddlewareChain sets the middleware chain for event consumer registration interception.
// This allows middleware to wrap event consumer handlers or modify entries before they are stored.
// Called by the lifecycle manager after building the middleware chain.
SetMiddlewareChain(chain MiddlewareChainRunner)
}
EventRegistry manages event definitions and consumer registrations for the framework's event-driven communication system.
EventRegistry serves as the central catalog for all events in the application, enabling:
- Event producers to declare the events they will emit
- Event consumers to discover and subscribe to events without direct dependencies
- The framework to wire up NATS subscriptions automatically
Unlike ServiceContainer which creates explicit module dependencies, EventRegistry enables loose coupling: emitters don't know their consumers, and consumers don't declare dependencies on emitters. This makes events ideal for broadcast notifications and decoupled architectures.
Initialization Sequence ¶
The registry is used during framework initialization in this order:
- EventEmitterModules register their event definitions via RegisterEvent()
- EventConsumerModules receive the registry via RegisterEventConsumers(registry)
- Consumer modules discover events via GetEventByName() during RegisterEventConsumers()
- Consumers register handlers via RegisterEventConsumer() or RegisterEventStreamConsumer()
- Framework creates NATS subscriptions from Entries() and StreamConsumerEntries()
Event Discovery Pattern ¶
Consumer modules implement EventConsumerModule to receive the registry and register handlers. Events can be discovered by name, version, and emitter module without importing the emitter:
func (m *NotificationModule) RegisterEventConsumers(registry mono.EventRegistry) error {
// Discover the event by name, version, and emitting module
eventDef, found := registry.GetEventByName("OrderCreated", "v1", "order")
if !found {
return fmt.Errorf("event not found: OrderCreated.v1 from order")
}
// Register a fire-and-forget consumer (NATS Core)
err := registry.RegisterEventConsumer(eventDef, m.handleOrderCreated, m)
if err != nil {
return err
}
// Or register a durable consumer (JetStream) for critical events
config := mono.StreamConsumerConfig{
Stream: mono.StreamConfig{Name: "notifications"},
Fetch: mono.FetchConfig{BatchSize: 10},
}
return registry.RegisterEventStreamConsumer(eventDef, config, m.handleOrderBatch, m)
}
Consumer Types ¶
EventConsumer: Fire-and-forget via NATS Core. Low latency (~1ms), no persistence. Use for real-time notifications where occasional message loss is acceptable.
EventStreamConsumer: Durable via JetStream. At-least-once delivery with ack/nack. Use for critical events like payments, audits, or compliance where loss is unacceptable.
See EventConsumerModule for the interface that consumer modules implement. See EventEmitterModule for the interface that producer modules implement.
The registry is thread-safe for concurrent access.
type EventStream ¶
type EventStream interface {
// Publish publishes a message to JetStream synchronously
Publish(ctx context.Context, subject string, data []byte) (MsgPubAck, error)
// PublishMsg publishes a complete mono.Msg to JetStream synchronously
PublishMsg(ctx context.Context, msg *Msg) (MsgPubAck, error)
// CreateOrUpdateStream creates or updates a stream (idempotent operation)
CreateOrUpdateStream(ctx context.Context, cfg StreamConfig) (jetstream.Stream, error)
// CreateOrUpdateConsumer creates or updates a consumer on a stream (idempotent operation)
CreateOrUpdateConsumer(ctx context.Context, streamName string, cfg ConsumerConfig) (jetstream.Consumer, error)
// Stream returns a stream handle for advanced operations
Stream(ctx context.Context, name string) (jetstream.Stream, error)
// DeleteStream deletes a stream
DeleteStream(ctx context.Context, name string) error
}
EventStream provides Stream operations for durable, persistent messaging (via JetStream).
JetStream adds persistence, replay, and at-least-once delivery semantics on top of core NATS functionality. This interface uses internal StreamConfig and ConsumerConfig types to abstract the underlying JetStream implementation.
type EventStreamConsumerEntry ¶
type EventStreamConsumerEntry struct {
// EventDef is the event definition being consumed
EventDef BaseEventDefinition
// Config is the StreamConsumerConfig with Stream.Subjects overridden to eventDef.Subject
Config StreamConsumerConfig
// Handler is the batch consumer handler function
Handler EventStreamConsumerHandler
// Module is the module that registered this consumer (the consuming module).
// This is NOT the module that emits the event - for that, use EventDef.ModuleName.
Module Module
// SequenceID is a unique identifier for this consumer registration.
// Used to ensure unique JetStream consumer names when multiple consumers
// subscribe to the same event. Set automatically during registration.
SequenceID int
}
EventStreamConsumerEntry tracks a JetStream event stream consumer registration. The framework collects these entries from the EventRegistry during setupNATSSubscriptions and creates the JetStream streams and consumers.
type EventStreamConsumerHandler ¶
EventStreamConsumerHandler processes batches of messages from a JetStream pull consumer. The handler receives a slice of messages that should be individually acknowledged.
Messages should be acknowledged using methods like Ack(), Nak(), NakWithDelay(), Term(), or InProgress(). This is similar to StreamConsumerHandler but used for event-based JetStream consumers.
Example:
handler := func(ctx context.Context, msgs []*Msg) error {
for _, msg := range msgs {
var event OrderCreatedEvent
if err := json.Unmarshal(msg.Data, &event); err != nil {
msg.Nak()
continue
}
// Process event...
msg.Ack()
}
return nil
}
type ExternalStream ¶
type ExternalStream struct {
// APIPrefix is the subject prefix that imports the other account/domain
// $JS.API.CONSUMER.> subjects.
APIPrefix string `json:"api"`
// DeliverPrefix is the delivery subject to use for the push consumer.
DeliverPrefix string `json:"deliver"`
}
ExternalStream allows you to qualify access to a stream source in another account.
type FetchConfig ¶
type FetchConfig struct {
// BatchSize is the maximum number of messages to fetch per batch.
// If not set, defaults to 10.
BatchSize int `json:"batch_size,omitempty"`
// Timeout is the maximum time to wait for messages per fetch operation.
// If not set, defaults to 5 seconds.
Timeout time.Duration `json:"timeout,omitempty"`
}
FetchConfig configures the fetch behavior for JetStream pull consumers. These settings control how messages are fetched in batches.
Example ¶
ExampleFetchConfig demonstrates configuring JetStream fetch behavior.
package main
import (
"fmt"
"time"
"github.com/go-monolith/mono/pkg/types"
)
func main() {
config := types.FetchConfig{
BatchSize: 10,
Timeout: 5 * time.Second,
}
fmt.Println("Batch size:", config.BatchSize)
fmt.Println("Timeout:", config.Timeout)
}
Output: Batch size: 10 Timeout: 5s
type FrameworkHealth ¶
type FrameworkHealth struct {
// Healthy indicates if the framework AND all modules are healthy.
// This is true only if the framework is running, NATS is healthy,
// and all modules that implement HealthAwareModule report healthy.
Healthy bool `json:"healthy"`
// State represents the current framework lifecycle state
State MonoFrameworkState `json:"state"`
// NATSHealthy indicates if the NATS server is operational
NATSHealthy bool `json:"nats_healthy"`
// Modules contains health status of each registered module, keyed by module name.
// Modules that don't implement HealthAwareModule will have SupportsHealth=false.
Modules map[string]ModuleHealth `json:"modules"`
// Timestamp is when this health check was performed
Timestamp time.Time `json:"timestamp"`
// Message provides additional context if the framework is unhealthy.
// Empty if Healthy is true.
Message string `json:"message,omitempty"`
}
FrameworkHealth represents the aggregated health status of the framework. It includes the overall framework state, NATS health, and health of all modules.
type Header ¶
Header represents NATS message headers.
Headers are key-value pairs attached to NATS messages for metadata propagation. Each header key maps to a slice of values, supporting multiple values per key (following HTTP header semantics).
Common header use cases: - Request tracing: "x-request-id", "x-trace-id" for distributed tracing - Correlation: "x-correlation-id" to link related messages - Routing: Custom headers for message routing logic - Metadata: "x-timestamp", "x-source" for context information
Headers are optional and survive NATS request-reply round trips. When publishing messages with headers, they are preserved through the NATS network.
Example:
msg := &mono.Msg{
Subject: "events.order.created",
Data: orderData,
Header: mono.Header{
"x-request-id": []string{"req-12345"},
"x-trace-id": []string{"trace-98765"},
},
}
eventBus.PublishMsg(msg)
type HealthCheckableModule ¶
type HealthCheckableModule interface {
Module
// Health returns the current health status.
// Implementations should:
// - Complete quickly (< 100ms) or respect the context deadline
// - Check critical dependencies and resources
// - Return detailed information in the Details map for debugging
// - Handle panics internally and return unhealthy status
//
// The framework may call Health periodically or on-demand (e.g., health check endpoints).
Health(ctx context.Context) HealthStatus
}
HealthCheckableModule provides health status.
Modules implementing this interface can report their health status to the framework. The framework aggregates module health and exposes it through the framework's Health() method.
Health checks should be fast (< 100ms) and should check critical dependencies:
- Database connection status
- External service connectivity
- Resource utilization (memory, disk)
- Internal state validity
Example:
func (m *DatabaseModule) Health(ctx context.Context) HealthStatus {
if err := m.db.PingContext(ctx); err != nil {
return HealthStatus{
Healthy: false,
Message: "database connection failed",
Details: map[string]any{"error": err.Error()},
}
}
return HealthStatus{Healthy: true, Message: "operational"}
}
type HealthStatus ¶
type HealthStatus struct {
// Healthy indicates if the module is operating normally
Healthy bool `json:"healthy"`
// Message provides a human-readable status description
// Examples: "operational", "database connection lost", "degraded performance"
Message string `json:"message,omitempty"`
// Details provides additional debugging information
// Can include metrics, error details, dependency status, etc.
Details map[string]any `json:"details,omitempty"`
}
HealthStatus represents module health. Used by HealthAwareModule to report operational status.
Example ¶
ExampleHealthStatus demonstrates creating health status.
package main
import (
"fmt"
"github.com/go-monolith/mono/pkg/types"
)
func main() {
status := types.HealthStatus{
Healthy: true,
Message: "All systems operational",
Details: map[string]any{
"database": "connected",
"cache": "connected",
"connections": 42,
},
}
fmt.Println("Healthy:", status.Healthy)
fmt.Println("Message:", status.Message)
fmt.Printf("Details count: %d\n", len(status.Details))
}
Output: Healthy: true Message: All systems operational Details count: 3
Example (Unhealthy) ¶
ExampleHealthStatus_unhealthy demonstrates reporting unhealthy status.
package main
import (
"fmt"
"github.com/go-monolith/mono/pkg/types"
)
func main() {
status := types.HealthStatus{
Healthy: false,
Message: "Database connection lost",
Details: map[string]any{
"error": "connection refused",
"last_active": "2025-01-01T12:00:00Z",
},
}
fmt.Println("Healthy:", status.Healthy)
fmt.Println("Message:", status.Message)
fmt.Printf("Details count: %d\n", len(status.Details))
}
Output: Healthy: false Message: Database connection lost Details count: 2
type LogLevel ¶
type LogLevel int
LogLevel represents logging severity levels.
Example ¶
ExampleLogLevel demonstrates the log level constants.
package main
import (
"fmt"
"github.com/go-monolith/mono/pkg/types"
)
func main() {
levels := []types.LogLevel{
types.LogLevelDebug,
types.LogLevelInfo,
types.LogLevelWarn,
types.LogLevelError,
}
for _, level := range levels {
fmt.Println(level)
}
}
Output: 0 1 2 3
type Logger ¶
type Logger interface {
// Debug logs a debug-level message
Debug(msg string, args ...any)
// Info logs an info-level message
Info(msg string, args ...any)
// Warn logs a warning-level message
Warn(msg string, args ...any)
// Error logs an error-level message
Error(msg string, args ...any)
// With returns a new logger with additional context fields
With(args ...any) Logger
// WithModule returns a new logger with module name context
WithModule(moduleName string) Logger
// WithError returns a new logger with error context
WithError(err error) Logger
}
Logger provides structured logging with levels and context.
The Logger interface wraps log/slog for framework-wide structured logging with module-specific context.
See docs/spec/foundation.md for detailed design documentation.
type LoggerFactory ¶
type LoggerFactory interface {
// NewLogger creates a logger for a specific module
NewLogger(moduleName string) Logger
// SetLevel sets the global log level
SetLevel(level LogLevel)
// GetLevel returns the current log level
GetLevel() LogLevel
}
LoggerFactory creates logger instances.
type LoggerOptions ¶
type LoggerOptions struct {
Level LogLevel
Format LogFormat
Output io.Writer
AddSource bool
UseDefault bool // If true, use default logger and ignore other options
}
LoggerOptions holds logger configuration.
type Marshaler ¶
Marshaler defines the function signature for event serialization. The default is json.Marshal, but can be overridden for custom formats (e.g., Protobuf).
type MiddlewareChainRunner ¶
type MiddlewareChainRunner interface {
// RunServiceRegistration runs a service registration through the middleware chain.
// Each middleware can modify the registration before it's stored.
RunServiceRegistration(ctx context.Context, reg ServiceRegistration) ServiceRegistration
// RunOutgoingMessage runs an outgoing message through the middleware chain.
// Each middleware can modify the message before it's sent.
// The final context is returned after all middleware have processed it.
RunOutgoingMessage(octx OutgoingMessageContext) OutgoingMessageContext
// RunEventConsumerRegistration runs an event consumer registration through the middleware chain.
// Each middleware can wrap the handler or modify the entry before it's stored.
RunEventConsumerRegistration(ctx context.Context, entry EventConsumerEntry) EventConsumerEntry
// RunEventStreamConsumerRegistration runs an event stream consumer registration through the middleware chain.
// Each middleware can wrap the handler or modify the entry before it's stored.
RunEventStreamConsumerRegistration(ctx context.Context, entry EventStreamConsumerEntry) EventStreamConsumerEntry
}
MiddlewareChainRunner executes middleware chains for different event types.
This interface is implemented by internal/middleware.Chain and is used by the service container to run service registration through the middleware chain.
This is an internal interface used for dependency injection between internal packages. Users should not implement this interface directly.
type MiddlewareModule ¶
type MiddlewareModule interface {
Module
// OnModuleLifecycle intercepts module lifecycle events.
// Middleware can observe or modify event metadata before it continues through the chain.
// Returns the (possibly modified) event for the next middleware.
//
// Example uses:
// - Audit logging of module start/stop
// - Adding custom metadata
// - Tracking module lifecycle timing
OnModuleLifecycle(ctx context.Context, event ModuleLifecycleEvent) ModuleLifecycleEvent
// OnServiceRegistration intercepts service registration.
// Middleware can wrap handlers (decorator pattern) or modify service configuration.
// Returns the (possibly modified) registration for the next middleware.
//
// Example uses:
// - Wrapping handlers for auto-encoding/decoding
// - Adding authentication/authorization layers
// - Injecting logging or metrics
// - Modifying stream consumer configuration
OnServiceRegistration(ctx context.Context, reg ServiceRegistration) ServiceRegistration
// OnConfigurationChange intercepts configuration updates.
// Middleware can observe or modify configuration values before they're applied.
// Returns the (possibly modified) event for the next middleware.
//
// Example uses:
// - Audit logging of config changes
// - Validating configuration values
// - Redacting sensitive values from logs
OnConfigurationChange(ctx context.Context, event ConfigurationEvent) ConfigurationEvent
// OnOutgoingMessage intercepts outgoing messages from service clients.
// Middleware can modify the message (e.g., inject headers) before it's sent.
// Returns the (possibly modified) context for the next middleware.
// The Msg field in the context can be modified in-place.
//
// Example uses:
// - Injecting request IDs or trace IDs into message headers
// - Adding authentication tokens
// - Logging outgoing message metadata
// - Encrypting message payloads
OnOutgoingMessage(octx OutgoingMessageContext) OutgoingMessageContext
// OnEventConsumerRegistration intercepts event consumer registration.
// Middleware can wrap handlers (decorator pattern) or modify the entry.
// Returns the (possibly modified) entry for the next middleware.
//
// Example uses:
// - Wrapping handlers for access logging with timing
// - Adding request ID extraction/injection
// - Injecting authentication/authorization layers
// - Adding metrics collection
OnEventConsumerRegistration(ctx context.Context, entry EventConsumerEntry) EventConsumerEntry
// OnEventStreamConsumerRegistration intercepts event stream consumer registration.
// Middleware can wrap handlers (decorator pattern) or modify the entry.
// Returns the (possibly modified) entry for the next middleware.
//
// This is similar to OnEventConsumerRegistration but for JetStream-based
// durable consumers that process message batches.
//
// Example uses:
// - Wrapping handlers for access logging with timing
// - Adding request ID extraction/injection for batch processing
// - Injecting authentication/authorization layers
// - Adding metrics collection for batch operations
OnEventStreamConsumerRegistration(ctx context.Context, entry EventStreamConsumerEntry) EventStreamConsumerEntry
}
MiddlewareModule provides interceptor hooks for framework events.
Middleware modules are auto-discovered by the lifecycle manager via type assertion and chained in registration order. Each hook receives event data and can modify it before passing to the next middleware in the chain.
Middleware modules have special lifecycle ordering:
- Start: Called BEFORE regular modules start
- Stop: Called AFTER regular modules stop
This ensures middleware can observe and modify the entire framework lifecycle.
Example implementations:
- Audit logging: Observe events without modification
- Handler wrapping: Wrap service handlers for auto-encoding/decoding
- Config modification: Override service configuration
See docs/spec/foundation.md for detailed design documentation.
type Module ¶
type Module interface {
// Name returns a unique identifier for the module.
// The name must be unique across all registered modules and should use kebab-case
// (e.g., "user-auth", "order-processing").
// This name is used for dependency resolution and service registration.
Name() string
// Start initializes the module and prepares it to receive requests.
// The context can be used to respect cancellation during initialization.
// Modules should:
// - Initialize resources (connections, caches, workers)
// - Set up internal state
// - Start background goroutines if needed
//
// Start is called after all dependencies have been started and after
// RegisterServices (if ServiceProviderModule) has been called.
//
// If Start returns an error, the framework will:
// - Stop all previously started modules in reverse order
// - Not call this module's Stop method
// - Return the error to the caller
//
// Modules start receiving messages only after Start() completes successfully.
Start(ctx context.Context) error
// Stop gracefully shuts down the module.
// The context can be used to enforce shutdown timeouts.
// Modules should:
// - Stop accepting new requests
// - Complete in-flight requests (respecting context deadline)
// - Release resources (connections, files, workers)
// - Stop background goroutines
//
// Stop is called in reverse dependency order, meaning this module's
// dependencies are still running when Stop is called.
//
// Modules stop receiving messages only after Stop() finishes.
// Stop should be idempotent and safe to call multiple times.
Stop(ctx context.Context) error
}
Module is the interface that all modules must implement.
Modules are the fundamental building blocks of the framework, providing isolated functionality with lifecycle management. Each module has a unique name and implements Start/Stop lifecycle methods.
The framework manages module lifecycle in dependency order:
- Start: called in dependency order (dependencies start first)
- Stop: called in reverse dependency order (dependencies stop last)
Module implementations must be safe for concurrent access to their exported methods.
See docs/spec/foundation.md for detailed design documentation.
Example (Minimal) ¶
ExampleModule_minimal demonstrates implementing the minimal Module interface.
package main
import (
"fmt"
)
func main() {
// Modules implement Name(), Start(), and Stop()
// This is the minimum required for any module
fmt.Println("Modules implement Name(), Start(), and Stop()")
}
Output: Modules implement Name(), Start(), and Stop()
type ModuleHealth ¶
type ModuleHealth struct {
// Name is the module identifier (matches Module.Name())
Name string `json:"name"`
// Healthy indicates if the module is operating normally.
// Always false if SupportsHealth is false (module doesn't implement HealthAwareModule).
Healthy bool `json:"healthy"`
// Message provides human-readable status description
Message string `json:"message,omitempty"`
// Details contains additional debugging information from the module's Health() method.
// Nil if SupportsHealth is false.
Details map[string]any `json:"details,omitempty"`
// SupportsHealth indicates if the module implements HealthAwareModule.
// If false, the Healthy field is meaningless (always True) and Details will be nil.
SupportsHealth bool `json:"supports_health"`
}
ModuleHealth represents health status of a single module.
type ModuleLifecycleEvent ¶
type ModuleLifecycleEvent struct {
// Type identifies the lifecycle event type
Type ModuleLifecycleEventType
// ModuleName is the name of the module (present for all events)
ModuleName string
// Duration is the module startup time (only for ModuleStartedEvent)
Duration time.Duration
// Error is the module stop error (only for ModuleStoppedEvent, nil if successful)
Error error
// Metadata holds extensible custom data that middleware can attach.
// This allows middleware to communicate through the chain.
Metadata map[string]any
}
ModuleLifecycleEvent contains data for module lifecycle events.
Different event types populate different fields:
- ModuleStartedEvent: ModuleName, Duration
- ModuleStoppedEvent: ModuleName, Error (nil if successful)
The Metadata field is available for all event types and allows middleware to attach custom data that flows through the chain.
type ModuleLifecycleEventType ¶
type ModuleLifecycleEventType string
ModuleLifecycleEventType represents the type of module lifecycle event.
const ( // ModuleStartedEvent indicates a module was started successfully. ModuleStartedEvent ModuleLifecycleEventType = "module.started" // ModuleStoppedEvent indicates a module was stopped. ModuleStoppedEvent ModuleLifecycleEventType = "module.stopped" )
type MonoFramework ¶
type MonoFramework interface {
// Register adds a module to the framework
Register(module Module) error
// RegisterPlugin registers a plugin module with an alias.
// The alias uniquely identifies the plugin and can differ from the module name.
// This allows multiple instances of the same plugin type under different aliases.
// Must be called before Start().
//
// Plugins start before middleware modules and stop after all other modules.
// They are excluded from the dependency graph and middleware hooks.
//
// Example:
//
// err := app.RegisterPlugin(storagePlugin, "primary-storage")
// err = app.RegisterPlugin(backupPlugin, "backup-storage")
RegisterPlugin(plugin PluginModule, alias string) error
// Plugin returns the plugin registered under the given alias.
// Returns nil if no plugin with that alias exists.
Plugin(alias string) PluginModule
// Start initializes and starts the framework and all registered modules
Start(ctx context.Context) error
// Stop gracefully shuts down all modules and the framework
Stop(ctx context.Context) error
// Services returns the service container of a module
Services(moduleName string) ServiceContainer
// EventBus returns the event bus used by a module
EventBus(moduleName string) EventBus
// Modules returns list of registered module names
Modules() []string
// Health returns the aggregated health status of the framework and all modules
Health(ctx context.Context) FrameworkHealth
// Logger returns the framework's internal logger.
// Modules can use this to obtain a logger instance for their own logging needs.
Logger() Logger
}
MonoFramework is the main entry point for the monolith framework. It manages module lifecycle, configuration, and inter-module communication.
See docs/spec/foundation.md for detailed design documentation.
type MonoFrameworkConfig ¶
type MonoFrameworkConfig struct {
// NATSOptions configures the embedded NATS server
NATSOptions NATSOptions
// LoggerOptions configures the logger factory
LoggerOptions LoggerOptions
// Logger is the logger instance used by the framework and modules
Logger Logger
// ShutdownTimeout is the maximum time to wait for graceful shutdown
ShutdownTimeout time.Duration
// QueueGroupOptimisticWindow configures the optimistic publish window for queue group services.
// When > 0, after a successful ACK, subsequent sends within this window use fire-and-forget publish.
// When 0 (default), always use ACK mode (disabled).
QueueGroupOptimisticWindow time.Duration
}
MonoFrameworkConfig holds framework configuration. While exported for use with functional options, instances should not be created directly. Use NewMonoApplication with functional options instead.
type MonoFrameworkState ¶
type MonoFrameworkState int
MonoFrameworkState represents the current state of the framework
const ( StateCreated MonoFrameworkState = iota StateStarting StateRunning StateStopping StateStopped )
func (MonoFrameworkState) String ¶
func (s MonoFrameworkState) String() string
String returns a human-readable string representation of the framework state.
type Msg ¶
type Msg struct {
Subject string
Reply string
Data []byte
Header Header
Sub *Subscription
// NatsMsg holds the underlying NATS message for acknowledgment.
// WARNING: This field is exported for internal eventbus implementations only.
// Application code should NOT access or modify this field directly.
NatsMsg any
}
Msg represents a message in the NATS system.
This unified message type supports both regular NATS messaging and JetStream messaging:
- For regular NATS: Subject, Reply, Data, and Header are populated
- For JetStream: Subject, Data, and Header are populated; Reply is typically empty
- JetStream methods (Ack, Nak, NakWithDelay, Term, InProgress) work for JetStream messages
- JetStream methods are no-ops for regular NATS messages
func (*Msg) Ack ¶
Ack acknowledges the message when using JetStream. For non-JetStream messages, this is a no-op.
func (*Msg) InProgress ¶
InProgress indicates that work is still in progress when using JetStream. For non-JetStream messages, this is a no-op.
func (*Msg) Nak ¶
Nak negatively acknowledges the message when using JetStream. For non-JetStream messages, this is a no-op.
func (*Msg) NakWithDelay ¶
NakWithDelay negatively acknowledges the message with a delay when using JetStream. For non-JetStream messages, this is a no-op.
type MsgHandler ¶
MsgHandler is a callback function for processing messages asynchronously in a subscription.
The handler is invoked whenever a message arrives on the subscribed subject. The context parameter contains the subscription's runtime context and can be used to detect graceful shutdown. When the context is cancelled, the subscription should terminate processing.
Error handling: The handler should not return an error; it should handle errors internally (logging, retrying, etc.). Errors in handlers do not affect the subscription - the handler is responsible for error recovery.
Performance note: Handlers are invoked asynchronously in a dedicated goroutine. Blocking operations in the handler will not block other message processing or the event loop.
Context cancellation: When the EventBus's runtime context is cancelled (typically during graceful shutdown), handlers may receive calls with a cancelled context. Handlers should check context.Err() and terminate cleanly.
Example:
handler := func(ctx context.Context, msg *mono.Msg) {
// Check for shutdown
if ctx.Err() != nil {
return // Context cancelled, terminate gracefully
}
// Process the message
var event MyEvent
if err := json.Unmarshal(msg.Data, &event); err != nil {
// Handle decode error internally
log.Printf("decode error: %v", err)
return
}
// Process event
if err := processEvent(ctx, &event); err != nil {
// Handle processing error internally
log.Printf("processing error: %v", err)
}
}
type MsgPubAck ¶
type MsgPubAck interface {
// Stream returns the name of the stream the message was published to
Stream() string
// Sequence returns the sequence number assigned to the message
Sequence() uint64
// Duplicate returns whether this message was detected as a duplicate
Duplicate() bool
// Domain returns the JetStream domain (for super-cluster deployments)
Domain() string
}
MsgPubAck represents a publish acknowledgment from JetStream. This interface abstracts the underlying JetStream PubAck for loose coupling.
SAFETY NOTE: This is an interface type. When implementing methods that return MsgPubAck:
- Always return nil (not a wrapped nil pointer) on error paths
- Always wrap a valid non-nil value on success paths
- Example: if err != nil { return nil, err // ✓ Correct: returns nil interface } return &msgPubAck{ack: validAck}, nil // ✓ Correct: wraps non-nil value
The nil interface gotcha (returning a typed nil) can cause subtle bugs where the returned value is non-nil but contains a nil pointer, which fails nil checks.
type NATSOptions ¶
type NATSOptions struct {
Host string
Port int
DontListen bool // If true, server won't listen on TCP (useful for in-process only). Requires UseInProcessConn=true.
UseInProcessConn bool // If true, client uses in-process connection instead of TCP. Can be used independently or with DontListen.
JetStreamEnabled bool // Track if JetStream is requested
JetStreamDomain string // JetStream domain for multi-tenancy
JetStreamDir string // JetStream storage directory
ClusterName string // NATS cluster name
ClusterHost string // NATS cluster host for inter-node communication
ClusterPort int // NATS cluster port for inter-node communication
ClusterRoutes []string // NATS cluster routes (URLs to other cluster nodes)
MaxPayload int32 // Maximum NATS message payload size
// NATS server logging flags (passed to SetLoggerV2)
LogDebug bool // If true, enables debug-level NATS server logging
LogTrace bool // If true, enables trace-level NATS server logging
LogSysTrace bool // If true, enables system trace logging (internal NATS operations)
// ConfigFile is the path to a NATS server configuration file.
// When specified, the file is processed using server.ProcessConfigFile() during Start().
// Programmatic options (like WithNATSPort) override settings from the config file.
ConfigFile string
}
NATSOptions holds NATS server configuration.
type OutgoingMessageContext ¶
type OutgoingMessageContext struct {
// ServiceType identifies the type of service sending the message
ServiceType ServiceType
// ServiceName is the name of the service
ServiceName string
// ModuleName is the name of the module that registered this service
ModuleName string
// Subject is the NATS subject the message will be sent to
Subject string
// Msg is the message being sent (middleware can modify headers/data in-place)
Msg *Msg
// Ctx is the context from the client call (may contain request ID, trace ID, etc.)
Ctx context.Context
// Metadata holds extensible custom data that middleware can attach
Metadata map[string]any
}
OutgoingMessageContext contains data for outgoing message interception.
This context is passed to middleware when a service client is about to send a message. Middleware can modify the message headers, data, or other fields before the message is sent.
The Msg field can be modified in-place. Middleware should ensure that Msg.Header is not nil before adding headers.
type Placement ¶
type Placement struct {
// Cluster is the name of the cluster to which the stream should be
// assigned.
Cluster string `json:"cluster"`
// Tags are used to match streams to servers in the cluster. A stream
// will be assigned to a server with a matching tag.
Tags []string `json:"tags,omitempty"`
}
Placement is used to guide placement of streams in clustered JetStream.
type PluginModule ¶
type PluginModule interface {
Module
// SetContainer is called by the framework to inject the plugin's
// dedicated ServiceContainer during initialization.
// This is called before Start() and before middleware modules start.
//
// The container is already bound to the plugin module when this method
// is called, and the EventBus is set on the container.
SetContainer(container ServiceContainer)
// Container returns the plugin's ServiceContainer.
// This allows other modules to access the plugin's registered services
// via the plugin instance they receive through SetPlugin().
Container() ServiceContainer
}
PluginModule is a special module type that starts before middleware modules and stops after all other modules. Plugins receive their own dedicated ServiceContainer and are excluded from the dependency graph and middleware hooks.
Plugins are designed for shared infrastructure that other modules need to access, such as storage backends, caching layers, or external service clients. Unlike regular modules, plugins:
- Start first and stop last (guaranteed initialization order)
- Are excluded from the dependency graph (don't participate in circular dependency checks)
- Receive their own ServiceContainer (can register services other modules access)
- Are excluded from middleware hooks (raw access without interception)
Plugin modules are registered with an alias via RegisterPlugin(), allowing multiple instances of the same plugin type to be registered under different aliases. This enables scenarios like having "primary-storage" and "backup-storage" plugins using the same underlying implementation with different configurations.
When to Use Plugins ¶
Use PluginModule when you need:
- Shared infrastructure accessed by multiple modules (e.g., file storage, cache)
- Resources that must be ready before any module starts
- Multiple instances of the same resource type with different configs
- Bypass middleware for low-level operations
Lifecycle Order ¶
Startup: Plugins -> Middleware -> Regular modules (dependency order) Shutdown: Regular modules (reverse) -> Middleware (reverse) -> Plugins (reverse)
Built-in Plugin ¶
See the fs-jetstream plugin in plugin/fs-jetstream/ for a production-ready example that provides file storage capabilities using JetStream ObjectStore.
Example ¶
type FileStoragePlugin struct {
container ServiceContainer
}
func (p *FileStoragePlugin) Name() string { return "fs-jetstream" }
func (p *FileStoragePlugin) SetContainer(container ServiceContainer) {
p.container = container
}
func (p *FileStoragePlugin) Container() ServiceContainer {
return p.container
}
func (p *FileStoragePlugin) Start(ctx context.Context) error {
// Plugin initialization
return nil
}
func (p *FileStoragePlugin) Stop(ctx context.Context) error {
// Plugin cleanup
return nil
}
type QGHP ¶
type QGHP struct {
QueueGroup string // Queue group name (e.g., "high-priority-workers")
Handler QueueGroupHandler // Handler function for this queue group
}
QGHP (QueueGroupHandler pair) associates a queue group name with its handler. Multiple pairs can be registered for a single service, allowing different queue groups to process messages from the same subject.
Example ¶
ExampleQGHP demonstrates creating queue group handler pairs.
package main
import (
"context"
"fmt"
"github.com/go-monolith/mono/pkg/types"
)
func main() {
handler := func(_ context.Context, msg *types.Msg) error {
fmt.Println("Processing message:", string(msg.Data))
return nil
}
pair := types.QGHP{
QueueGroup: "worker-pool",
Handler: handler,
}
fmt.Println("Queue group:", pair.QueueGroup)
fmt.Printf("Has handler: %v\n", pair.Handler != nil)
}
Output: Queue group: worker-pool Has handler: true
type QueueGroupHandler ¶
QueueGroupHandler processes fire-and-forget messages from queue group.
type QueueGroupServiceClient ¶
type QueueGroupServiceClient interface {
// Send sends a message payload to the queue group and waits for ACK.
// Returns ErrServiceUnavailable if no handlers are online.
Send(ctx context.Context, data []byte) error
// SendMsg sends a raw message to the queue group and waits for ACK.
// Returns ErrServiceUnavailable if no handlers are online.
SendMsg(ctx context.Context, msg *Msg) error
}
QueueGroupServiceClient sends fire-and-forget messages to queue group.
type RePublish ¶
type RePublish struct {
// Source is the subject pattern to match incoming messages against.
Source string `json:"src,omitempty"`
// Destination is the subject pattern to republish the subject to.
Destination string `json:"dest"`
// HeadersOnly is a flag to indicate that only the headers should be
// republished.
HeadersOnly bool `json:"headers_only,omitempty"`
}
RePublish is for republishing messages once committed to a stream. The original subject is remapped from the subject pattern to the destination pattern.
type ReplayPolicy ¶
type ReplayPolicy int
ReplayPolicy determines how the consumer should replay messages it already has queued in the stream.
ReplayPolicy controls the rate at which historical messages are delivered to the consumer. This affects how quickly you can replay old messages and test with realistic timing. The default is ReplayInstantPolicy (as fast as possible).
const ( // ReplayInstantPolicy replays messages as fast as possible. // This is the default replay policy. // Messages are delivered at network speed, regardless of their original publish time gaps. // Use this for: Quick recovery, bulk data processing, testing with realistic message volumes. // Behavior: Consumer receives historical messages as fast as the server can send them. // Performance: Maximizes throughput, returns to live messages faster. // Warning: Tests with this policy won't accurately reflect real production timing. ReplayInstantPolicy ReplayPolicy = iota // ReplayOriginalPolicy maintains the original timing between messages. // The server reproduces the original timing intervals between message publishes. // Use this for: Realistic testing, simulating production traffic in dev/staging environments. // Behavior: If messages were originally published 1 second apart, they are delivered 1 second apart. // Recommended for: Load testing, performance testing, understanding time-sensitive logic. // Performance: Slower than Instant (takes real time to replay), but realistic. // Example: Replay 24 hours of production data in 24 real hours to test behavior. ReplayOriginalPolicy )
type RequestReplyHandler ¶
RequestReplyHandler processes request messages and returns response data.
type RequestReplyServiceClient ¶
type RequestReplyServiceClient interface {
// Call sends a request payload and waits for a response
Call(ctx context.Context, data []byte) (*Msg, error)
// CallMsg sends a raw request message and waits for a response
CallMsg(ctx context.Context, msg *Msg) (*Msg, error)
}
RequestReplyServiceClient sends requests and receives responses.
type RetentionPolicy ¶
type RetentionPolicy int
RetentionPolicy defines the retention policy for a stream.
RetentionPolicy controls how long messages are kept in the stream. The default is LimitsPolicy. Different policies suit different use cases: - Use LimitsPolicy for event logs and audit trails (time/size-based retention) - Use InterestPolicy for event streams (retain while consumers are subscribed) - Use WorkQueuePolicy for task queues (delete after processing)
const ( // LimitsPolicy retains messages until limits (MaxAge, MaxBytes, MaxMsgs) are reached. // This is the default retention policy. // Use this for event logs, audit trails, and compliance scenarios where you need // messages to persist for a specified time period or size limit regardless of consumers. // Messages are automatically deleted when they exceed the configured age, size, or count limits. // Does not require active consumers. LimitsPolicy RetentionPolicy = iota // InterestPolicy retains messages only while there are active consumers subscribed. // Once all consumers are removed, the stream deletes messages. // Use this for real-time notification systems where you don't need historical data // and want to save storage space by discarding messages when no one is listening. // This is more storage-efficient for streams where messages are only relevant while // there are active subscribers. Does not support adding consumers after stream creation // if there are no initial consumers. InterestPolicy // WorkQueuePolicy deletes messages immediately after they are acknowledged by a consumer. // This is designed for task/work queue patterns where each message is processed once and // then discarded. Use this for distributed work queues where you don't need to replay messages // and storage should be minimal. Perfect for async job processing, distributed task runners, // and fire-and-forget work patterns. Messages that are not acknowledged will be retried. WorkQueuePolicy )
type ServiceContainer ¶
type ServiceContainer interface {
// BindModule binds this container to a module
BindModule(module Module) error
// SetEventBus sets the EventBus for NATS-based services.
// This must be called before registering RequestReply, QueueGroup, or StreamConsumer services.
SetEventBus(bus EventBus)
// SetQueueGroupOptimisticWindow configures the optimistic publish window for queue group services.
// When window > 0, queue group clients will use fire-and-forget publish after a successful ACK.
// When window = 0 (default), all sends use ACK mode.
SetQueueGroupOptimisticWindow(window time.Duration)
// SetMiddlewareChain sets the middleware chain for service registration interception.
// This must be called before registering any services if middleware is needed.
SetMiddlewareChain(chain MiddlewareChainRunner)
// RegisterChannelService registers a bidirectional Go channel service
RegisterChannelService(name string, in chan *Msg, out chan *Msg) error
// RegisterRequestReplyService registers a request-reply service over NATS
RegisterRequestReplyService(name string, handler RequestReplyHandler) error
// RegisterQueueGroupService registers a queue group service with multiple handlers and acknowledgment.
// All handlers share the same subject (services.<module>.<service>) but use different queue groups.
// Handlers send an ACK before processing to enable "no responder" error detection.
RegisterQueueGroupService(name string, pairs ...QGHP) error
// RegisterStreamConsumerService registers a JetStream durable pull consumer service.
// The service will automatically create/update the stream and consumer on startup.
// The handler receives batches of messages that should be individually acknowledged.
RegisterStreamConsumerService(name string, config StreamConsumerConfig, handler StreamConsumerHandler) error
// GetChannelService retrieves a channel service by name with a per-consumer out channel.
// The consumerModule parameter identifies the calling module and ensures it receives
// a dedicated out channel, preventing race conditions when multiple modules consume
// from the same service.
GetChannelService(serviceName string, consumerModule string) (in chan *Msg, out chan *Msg, err error)
// MustGetChannelService retrieves a channel service and panics if not found.
// The consumerModule parameter identifies the calling module and ensures it receives
// a dedicated out channel.
MustGetChannelService(serviceName string, consumerModule string) (in chan *Msg, out chan *Msg)
// GetRequestReplyService retrieves a request-reply service client
GetRequestReplyService(name string) (RequestReplyServiceClient, error)
// GetQueueGroupService retrieves a queue group service client
GetQueueGroupService(name string) (QueueGroupServiceClient, error)
// GetStreamConsumerService retrieves a stream consumer service client
// that can publish messages to the stream for consumption.
GetStreamConsumerService(name string) (StreamConsumerServiceClient, error)
// Has checks if a service with the given name is registered
Has(name string) bool
// Unregister removes a service from the container
Unregister(name string) error
// Entries returns all registered ServiceEntry pointers
Entries() []*ServiceEntry
// StartChannelRouters starts router goroutines for all registered channel services.
// This is called by the lifecycle manager after all modules have started.
// Router goroutines handle message routing from provider out channels to per-consumer channels.
StartChannelRouters(ctx context.Context)
}
ServiceContainer provides dependency injection with name-based registration.
The Service Container supports three types of services: - Channel services: Bidirectional Go channel communication (in-process) - RequestReply services: Synchronous NATS request/response pattern - Queue Group services: Asynchronous NATS queue subscription pattern
See docs/spec/foundation.md for detailed design documentation.
type ServiceEntry ¶
type ServiceEntry struct {
Name string
Type ServiceType
InChannel chan *Msg
OutChannel chan *Msg
// ConsumerChannels maintains per-consumer output channels to prevent race conditions
// when multiple modules consume from the same channel service. Map key is the consumer
// module name. Access must be synchronized using ConsumerMu.
ConsumerChannels map[string]chan *Msg
// ConsumerMu synchronizes access to ConsumerChannels during read/write operations.
// The router goroutine uses RLock for routing, while GetChannelService uses Lock
// when creating new consumer channels.
ConsumerMu sync.RWMutex
RequestHandler RequestReplyHandler
QueueGroup string // Queue group for RequestReply services
QueueHandlers []QGHP // Multiple handler pairs for QueueGroup services
StreamConsumerConfig *StreamConsumerConfig // For StreamConsumer type
StreamConsumerHandler StreamConsumerHandler // For StreamConsumer type
ModuleName string
Subject string
Created time.Time
}
ServiceEntry represents a registered service.
type ServiceProviderModule ¶
type ServiceProviderModule interface {
Module
// RegisterServices is called after SetDependencyServiceContainer but before Start().
// Modules should register all services they provide using the given container.
//
// The container must be bound to this module using container.BindModule(m) before
// registering services. Services registered here will be available to other modules
// that have a dependency on this module.
//
// If RegisterServices returns an error, framework initialization will fail and
// no modules will be started.
RegisterServices(container ServiceContainer) error
}
ServiceProviderModule allows modules to register services they provide.
Modules implementing this interface can register services that other modules can access. Services are registered by name and type (Channel, RequestReply, QueueGroup).
RegisterServices is called during framework initialization:
- After SetDependencyServiceContainer (if DependentModule)
- Before Start()
- For all modules before any module's Start() is called
This ensures all services are registered before modules start handling requests.
Example:
func (m *InventoryModule) RegisterServices(container ServiceContainer) error {
// Bind container to this module
if err := container.BindModule(m); err != nil {
return err
}
// Register a request-reply service
handler := func(ctx context.Context, msg *Msg) (*Msg, error) {
// Handle inventory check request
return &Msg{Data: []byte("in-stock")}, nil
}
return container.RegisterRequestReplyService("check-stock", handler)
}
type ServiceRegistration ¶
type ServiceRegistration struct {
// Type identifies the service type
Type ServiceType
// Name is the service name
Name string
// ModuleName is the name of the module registering this service
ModuleName string
// Subject is the NATS subject for NATS-based services
Subject string
// RequestHandler processes RequestReply requests (can be wrapped by middleware)
RequestHandler RequestReplyHandler
// QueueHandlers contains queue group handler pairs (handlers can be wrapped)
QueueHandlers []QGHP
// StreamHandler processes StreamConsumer message batches (can be wrapped by middleware)
StreamHandler StreamConsumerHandler
// StreamConsumerConfig configures the JetStream consumer (can be modified by middleware)
StreamConsumerConfig *StreamConsumerConfig
// InChannel is the input channel for Channel services
InChannel chan *Msg
// OutChannel is the output channel for Channel services
OutChannel chan *Msg
// Metadata holds extensible custom data that middleware can attach
Metadata map[string]any
}
ServiceRegistration contains service registration data that can be modified by middleware.
Middleware can:
- Wrap handlers (decorator pattern) by replacing handler functions
- Modify configuration (e.g., StreamConsumerConfig)
- Add metadata for other middleware
Only the fields relevant to the ServiceType are populated:
- RequestReply: Type, Name, ModuleName, Subject, RequestHandler
- QueueGroup: Type, Name, ModuleName, Subject, QueueHandlers
- StreamConsumer: Type, Name, ModuleName, Subject, StreamHandler, StreamConsumerConfig
- Channel: Type, Name, ModuleName, InChannel, OutChannel
type ServiceType ¶
type ServiceType int
ServiceType identifies the type of service.
Example ¶
ExampleServiceType demonstrates the service type constants.
package main
import (
"fmt"
"github.com/go-monolith/mono/pkg/types"
)
func main() {
fmt.Println("Channel:", types.FormatServiceType(types.ServiceTypeChannel))
fmt.Println("RequestReply:", types.FormatServiceType(types.ServiceTypeRequestReply))
fmt.Println("QueueGroup:", types.FormatServiceType(types.ServiceTypeQueueGroup))
fmt.Println("StreamConsumer:", types.FormatServiceType(types.ServiceTypeStreamConsumer))
}
Output: Channel: channel RequestReply: request_reply QueueGroup: queue_group StreamConsumer: stream_consumer
const ( ServiceTypeChannel ServiceType = iota ServiceTypeRequestReply ServiceTypeQueueGroup ServiceTypeStreamConsumer // JetStream durable pull consumer )
type StorageType ¶
type StorageType int
StorageType defines the storage backend for a stream.
StorageType controls where stream messages are persisted. The choice affects performance, durability, and resource usage: - Use FileStorage for durable, persistent storage (survives server restarts) - Use MemoryStorage for high-performance scenarios with acceptable data loss risk
const ( // FileStorage stores messages on disk. // Messages are persisted to the file system and survive server restarts. // Use this for production workloads requiring durability: financial transactions, // audit logs, compliance data, and any critical business events. // Performance: Slightly slower than MemoryStorage but with full durability. // Data loss risk: None - messages survive server crashes. // This is the recommended storage type for most use cases. FileStorage StorageType = iota // MemoryStorage stores messages in memory (RAM). // Messages are lost if the server restarts or crashes. // Use this for high-throughput, non-critical scenarios where you prioritize // performance over durability: real-time analytics, metrics collection, and // streams where you have multiple replicas and can tolerate occasional data loss. // Performance: Very fast, lowest latency. // Data loss risk: High - all messages lost on server restart. // Only use if you have failure tolerance built into your architecture. MemoryStorage )
type StoreCompression ¶
type StoreCompression uint8
StoreCompression determines how messages are compressed in the stream storage.
StoreCompression controls whether and how messages are compressed on disk. Compression trades CPU for storage space. Choose based on your hardware and workload. The default is NoCompression (raw storage).
const ( // NoCompression disables compression; messages are stored as-is. // This is the default. // Use this for: High-throughput scenarios, CPU-constrained environments, fast access patterns. // Performance: Lowest latency, no compression overhead. // Storage: Uses full message size (largest storage footprint). // Recommended for: Real-time systems, where CPU is limited and compression overhead unacceptable. // Good for: Metrics streams, high-frequency trading, low-latency requirements. NoCompression StoreCompression = iota // S2Compression enables S2 (Snappy) compression on messages in storage. // Uses the fast S2 algorithm for compression/decompression. // Use this for: Storage-constrained environments, cost-sensitive scenarios, batch processing. // Performance: Slight latency increase due to compression, but still very fast (fast codec). // Storage: Significantly reduced storage size (typically 50-80% reduction for JSON/text data). // CPU trade-off: Uses more CPU for compression during write and decompression on read. // Recommended for: Large archives, compliance storage, cost-optimized systems. // Good for: Event archives, audit logs, backup storage where speed matters but size is critical. // Note: S2 is faster than GZIP but slightly less compressed; good balance for most use cases. S2Compression )
type StreamConfig ¶
type StreamConfig struct {
// Name is the name of the stream. It is required and must be unique
// across the JetStream account.
//
// Names cannot contain whitespace, ., *, >, path separators
// (forward or backwards slash), and non-printable characters.
Name string `json:"name"`
// Description is an optional description of the stream.
Description string `json:"description,omitempty"`
// Subjects is a list of subjects that the stream is listening on.
// Wildcards are supported. Subjects cannot be set if the stream is
// created as a mirror.
Subjects []string `json:"subjects,omitempty"`
// Retention defines the message retention policy for the stream.
// Defaults to LimitsPolicy.
Retention RetentionPolicy `json:"retention"`
// MaxConsumers specifies the maximum number of consumers allowed for
// the stream. If set to 0, server default is -1 (unlimited).
MaxConsumers int `json:"max_consumers"`
// MaxMsgs is the maximum number of messages the stream will store.
// After reaching the limit, stream adheres to the discard policy.
// If not set, server default is -1 (unlimited).
MaxMsgs int64 `json:"max_msgs"`
// MaxBytes is the maximum total size of messages the stream will store.
// After reaching the limit, stream adheres to the discard policy.
// If not set, server default is -1 (unlimited).
MaxBytes int64 `json:"max_bytes"`
// Discard defines the policy for handling messages when the stream
// reaches its limits in terms of number of messages or total bytes.
Discard DiscardPolicy `json:"discard"`
// DiscardNewPerSubject is a flag to enable discarding new messages per
// subject when limits are reached. Requires DiscardPolicy to be
// DiscardNew and the MaxMsgsPerSubject to be set.
DiscardNewPerSubject bool `json:"discard_new_per_subject,omitempty"`
// MaxAge is the maximum age of messages that the stream will retain.
MaxAge time.Duration `json:"max_age"`
// MaxMsgsPerSubject is the maximum number of messages per subject that
// the stream will retain.
MaxMsgsPerSubject int64 `json:"max_msgs_per_subject"`
// MaxMsgSize is the maximum size of any single message in the stream.
MaxMsgSize int32 `json:"max_msg_size,omitempty"`
// Storage specifies the type of storage backend used for the stream
// (file or memory).
Storage StorageType `json:"storage"`
// Replicas is the number of stream replicas in clustered JetStream.
// If set to 0, server default is 1. Maximum is 5.
Replicas int `json:"num_replicas"`
// NoAck is a flag to disable acknowledging messages received by this
// stream.
//
// If set to true, publish methods from the JetStream client will not
// work as expected, since they rely on acknowledgements. Core NATS
// publish methods should be used instead. Note that this will make
// message delivery less reliable.
NoAck bool `json:"no_ack,omitempty"`
// Duplicates is the window within which to track duplicate messages.
// If not set, server default is 2 minutes.
Duplicates time.Duration `json:"duplicate_window,omitempty"`
// Placement is used to declare where the stream should be placed via
// tags and/or an explicit cluster name.
Placement *Placement `json:"placement,omitempty"`
// Mirror defines the configuration for mirroring another stream.
Mirror *StreamSource `json:"mirror,omitempty"`
// Sources is a list of other streams this stream sources messages from.
Sources []*StreamSource `json:"sources,omitempty"`
// Sealed streams do not allow messages to be published or deleted via limits or API,
// sealed streams can not be unsealed via configuration update. Can only
// be set on already created streams via the Update API.
Sealed bool `json:"sealed,omitempty"`
// DenyDelete restricts the ability to delete messages from a stream via
// the API. Defaults to false.
DenyDelete bool `json:"deny_delete,omitempty"`
// DenyPurge restricts the ability to purge messages from a stream via
// the API. Defaults to false.
DenyPurge bool `json:"deny_purge,omitempty"`
// AllowRollup allows the use of the Nats-Rollup header to replace all
// contents of a stream, or subject in a stream, with a single new
// message.
AllowRollup bool `json:"allow_rollup_hdrs,omitempty"`
// Compression specifies the message storage compression algorithm.
// Defaults to NoCompression.
Compression StoreCompression `json:"compression"`
// FirstSeq is the initial sequence number of the first message in the
// stream.
FirstSeq uint64 `json:"first_seq,omitempty"`
// SubjectTransform allows applying a transformation to matching
// messages' subjects.
SubjectTransform *SubjectTransformConfig `json:"subject_transform,omitempty"`
// RePublish allows immediate republishing a message to the configured
// subject after it's stored.
RePublish *RePublish `json:"republish,omitempty"`
// AllowDirect enables direct access to individual messages using direct
// get API. Defaults to false.
AllowDirect bool `json:"allow_direct"`
// MirrorDirect enables direct access to individual messages from the
// origin stream using direct get API. Defaults to false.
MirrorDirect bool `json:"mirror_direct"`
// ConsumerLimits defines limits of certain values that consumers can
// set, defaults for those who don't set these settings
ConsumerLimits StreamConsumerLimits `json:"consumer_limits,omitempty"`
// Metadata is a set of application-defined key-value pairs for
// associating metadata on the stream. This feature requires nats-server
// v2.10.0 or later.
Metadata map[string]string `json:"metadata,omitempty"`
// Template identifies the template that manages the Stream.
// Deprecated: This feature is no longer supported.
Template string `json:"template_owner,omitempty"`
}
StreamConfig is the configuration of a JetStream stream. This is the framework's internal type that abstracts the underlying JetStream configuration.
Example ¶
ExampleStreamConfig demonstrates configuring a JetStream stream.
package main
import (
"fmt"
"github.com/go-monolith/mono/pkg/types"
)
func main() {
config := types.StreamConfig{
Name: "orders",
Description: "Order events stream",
Subjects: []string{"events.order.>"},
Retention: types.WorkQueuePolicy,
Storage: types.FileStorage,
MaxMsgs: 10000,
MaxBytes: 100 * 1024 * 1024, // 100 MB
}
fmt.Println("Stream name:", config.Name)
fmt.Println("Description:", config.Description)
fmt.Printf("Subjects: %v\n", config.Subjects)
fmt.Println("Retention:", config.Retention)
fmt.Println("Storage:", config.Storage)
fmt.Println("MaxMsgs:", config.MaxMsgs)
fmt.Println("MaxBytes:", config.MaxBytes)
}
Output: Stream name: orders Description: Order events stream Subjects: [events.order.>] Retention: 2 Storage: 0 MaxMsgs: 10000 MaxBytes: 104857600
type StreamConsumerConfig ¶
type StreamConsumerConfig struct {
// Stream contains the JetStream stream configuration.
// Required: Stream.Name must be set.
// Optional: Stream.Subjects - if empty, defaults to "services.<module>.<service>.*"
Stream StreamConfig `json:"stream"`
// Consumer contains the JetStream consumer configuration.
// The consumer name will be auto-generated if not set.
Consumer ConsumerConfig `json:"consumer"`
// Fetch contains the fetch loop configuration for the pull consumer.
// Optional: defaults are applied for BatchSize (10) and Timeout (5s).
Fetch FetchConfig `json:"fetch"`
}
StreamConsumerConfig configures a JetStream durable pull consumer service. It composes StreamConfig for stream settings, ConsumerConfig for consumer settings, and FetchConfig for fetch loop behavior.
type StreamConsumerHandler ¶
StreamConsumerHandler processes batches of messages from a JetStream pull consumer. The handler receives a slice of messages that should be individually acknowledged.
Messages should be acknowledged using methods like Ack(), Nak(), NakWithDelay(), Term(), or InProgress().
type StreamConsumerLimits ¶
type StreamConsumerLimits struct {
// InactiveThreshold is a duration which instructs the server to clean
// up the consumer if it has been inactive for the specified duration.
InactiveThreshold time.Duration `json:"inactive_threshold,omitempty"`
// MaxAckPending is a maximum number of outstanding unacknowledged
// messages for a consumer.
MaxAckPending int `json:"max_ack_pending,omitempty"`
}
StreamConsumerLimits are the limits for a consumer on a stream. These can be overridden on a per consumer basis.
type StreamConsumerServiceClient ¶
type StreamConsumerServiceClient interface {
// Publish publishes a message to the stream (with JetStream persistence)
Publish(ctx context.Context, data []byte) (MsgPubAck, error)
// PublishMsg publishes a complete message with headers to the stream
PublishMsg(ctx context.Context, msg *Msg) (MsgPubAck, error)
}
StreamConsumerServiceClient publishes messages to a JetStream stream for consumption. Messages are persisted in JetStream and will be delivered to the stream consumer.
type StreamSource ¶
type StreamSource struct {
// Name is the name of the stream to source from.
Name string `json:"name"`
// OptStartSeq is the sequence number to start sourcing from.
OptStartSeq uint64 `json:"opt_start_seq,omitempty"`
// OptStartTime is the timestamp of messages to start sourcing from.
OptStartTime *time.Time `json:"opt_start_time,omitempty"`
// FilterSubject is the subject filter used to only replicate messages
// with matching subjects.
FilterSubject string `json:"filter_subject,omitempty"`
// SubjectTransforms is a list of subject transforms to apply to
// matching messages.
//
// Subject transforms on sources and mirrors are also used as subject
// filters with optional transformations.
SubjectTransforms []SubjectTransformConfig `json:"subject_transforms,omitempty"`
// External is a configuration referencing a stream source in another
// account or JetStream domain.
External *ExternalStream `json:"external,omitempty"`
// Domain is used to configure a stream source in another JetStream
// domain. This setting will set the External field with the appropriate
// APIPrefix. This field is not marshaled to JSON; it's used during
// configuration setup.
Domain string `json:"-"`
}
StreamSource dictates how streams can source from other streams.
type SubjectTransformConfig ¶
type SubjectTransformConfig struct {
// Source is the subject pattern to match incoming messages against.
Source string `json:"src"`
// Destination is the subject pattern to remap the subject to.
Destination string `json:"dest"`
}
SubjectTransformConfig is for applying a subject transform (to matching messages) before doing anything else when a new message is received.
type Subscription ¶
type Subscription interface {
// Unsubscribe removes interest in the subscription
Unsubscribe() error
// Drain removes interest but processes pending messages before completion
Drain() error
// IsValid returns false if subscription has been unsubscribed
IsValid() bool
// Subject returns the subject pattern
Subject() string
// Queue returns the queue group name (empty string for non-queue subscriptions)
Queue() string
// NextMsg fetches the next message (for sync subscriptions)
NextMsg(timeout time.Duration) (*Msg, error)
// NextMsgWithContext fetches next message with context cancellation
NextMsgWithContext(ctx context.Context) (*Msg, error)
}
Subscription represents an active subscription.
type TypedEventConsumerHandler ¶
TypedEventConsumerHandler is a type-safe event consumer handler signature. The handler receives the deserialized event directly, along with the raw message for accessing headers and other metadata.
Use RegisterTypedConsumer to register handlers with this signature.
Example:
func (m *Module) handleOrderCreated(ctx context.Context, event OrderCreatedEvent, msg *mono.Msg) error {
// event is already deserialized!
fmt.Println("Order ID:", event.OrderID)
return nil
}
type TypedEventStreamConsumerHandler ¶
type TypedEventStreamConsumerHandler[T any] func(ctx context.Context, events []T, msgs []*Msg) error
TypedEventStreamConsumerHandler is a type-safe batch event handler for JetStream consumers. The handler receives pre-deserialized events along with the raw messages for acknowledgment.
Example:
func (m *Module) handleOrders(ctx context.Context, events []OrderCreatedEvent, msgs []*mono.Msg) error {
for i, event := range events {
fmt.Println("Order ID:", event.OrderID)
msgs[i].Ack()
}
return nil
}
type TypedQGHP ¶
type TypedQGHP[T any] struct { QueueGroup string // Queue group name (e.g., "high-priority-workers") Handler TypedQueueGroupHandler[T] // Typed handler function for this queue group }
TypedQGHP associates a queue group name with a typed handler. This is the generic version of QGHP for use with RegisterTypedQueueGroupService.
type TypedQueueGroupHandler ¶
TypedQueueGroupHandler processes typed fire-and-forget messages from queue group. The handler receives an already-unmarshaled message payload.
type TypedRequestReplyHandler ¶
type TypedRequestReplyHandler[Req any, Resp any] func(ctx context.Context, req Req, msg *Msg) (Resp, error)
TypedRequestReplyHandler processes typed request messages and returns typed responses. The handler receives an already-unmarshaled request and returns a typed response that will be marshaled before sending.
type TypedStreamConsumerHandler ¶
TypedStreamConsumerHandler processes typed batches of messages from a JetStream pull consumer. The handler receives slices of already-unmarshaled payloads and the original messages for acknowledgment.
type Unmarshaler ¶
Unmarshaler defines the function signature for event deserialization. The default is json.Unmarshal, but can be overridden for custom formats (e.g., Protobuf).
type UsePluginModule ¶
type UsePluginModule interface {
Module
// SetPlugin is called by the framework for each registered plugin.
// The module should filter by alias and store the plugins it needs.
// This method is called for all registered plugins in registration order.
SetPlugin(alias string, plugin PluginModule)
}
UsePluginModule allows modules (including middleware) to receive plugin instances from the framework. The framework injects ALL registered plugins via SetPlugin() during module initialization, and the module can filter which plugins it wants to use.
This interface enables plugin discovery without creating explicit dependencies. Modules don't need to declare plugins in Dependencies() - the framework automatically injects all registered plugins, and the module filters by alias.
Plugin Discovery Pattern ¶
Unlike DependentModule which requires declaring dependencies upfront, UsePluginModule provides a discovery-based approach:
- Framework registers plugins with aliases (e.g., "storage", "cache")
- Framework calls SetPlugin() for each registered plugin
- Module filters and stores plugins it needs based on alias
- Module verifies required plugins in Start() before using them
Timing ¶
SetPlugin is called:
- After ServiceContainer binding (BindModule)
- Before SetDependencyServiceContainer
- Before SetEventBus
- Before RegisterServices
- Before Start
Important ¶
Modules implementing this interface should verify that required plugins were injected before using them in Start(). If a required plugin is not registered, the plugin reference will be nil.
Example ¶
type DocumentModule struct {
storagePlugin mono.PluginModule
}
func (m *DocumentModule) Name() string { return "documents" }
func (m *DocumentModule) SetPlugin(alias string, plugin mono.PluginModule) {
// Filter and store only the plugins you need
if alias == "primary-storage" {
m.storagePlugin = plugin
}
}
func (m *DocumentModule) Start(ctx context.Context) error {
// Verify required plugin was injected
if m.storagePlugin == nil {
return fmt.Errorf("required plugin 'primary-storage' not registered")
}
// Access plugin's services via its container
container := m.storagePlugin.Container()
// Use services...
return nil
}