Documentation
¶
Overview ¶
Package event. batch provides batch processing capabilities for events.
Package event. event_bus provides the core pub/sub event bus implementation.
Package event. event_types provides a strongly-typed event bus system for decoupling core logic from side effects.
Package event. filter provides event filtering capabilities for conditional handler execution.
Package event. global provides a singleton event bus instance for application-wide use.
Package event. middleware provides composable middleware for event handlers.
Package event. priority provides priority-based event processing.
Index ¶
- Constants
- func InitGlobalBus(cfg *Config)
- func Publish(event *Event)
- func ShutdownGlobalBus()
- func Subscribe(eventType EventType, handler EventHandler) string
- type BatchConfig
- type BatchHandler
- type BatchProcessor
- type Config
- type Event
- type EventBus
- type EventHandler
- type EventType
- type Filter
- type FilteredHandler
- type LoginFailedData
- type LoginSuccessData
- type Middleware
- type PriorityEvent
- type PriorityQueue
- type Subscription
- type TokenRevokedData
- type UserCreatedData
- type UserUpdatedData
Constants ¶
const ( PriorityLow = 1 PriorityNormal = 5 PriorityHigh = 10 )
Priority levels for events.
Variables ¶
This section is empty.
Functions ¶
func InitGlobalBus ¶
func InitGlobalBus(cfg *Config)
InitGlobalBus initializes the global event bus with a custom configuration. IMPORTANT: This function should be called once at the start of your application, before any goroutines that might call GetGlobalBus are started. It has no effect if the global bus has already been initialized (e.g., by a call to GetGlobalBus).
func Publish ¶
func Publish(event *Event)
Publish is a convenience function to publish events to the global bus.
func ShutdownGlobalBus ¶
func ShutdownGlobalBus()
ShutdownGlobalBus shuts down the global event bus.
func Subscribe ¶
func Subscribe(eventType EventType, handler EventHandler) string
Subscribe is a convenience function to subscribe to the global bus.
Types ¶
type BatchConfig ¶ added in v0.5.0
type BatchConfig struct {
BatchSize int // Max events per batch
FlushPeriod time.Duration // Max time to wait before flushing
}
BatchConfig holds configuration for batch processing.
type BatchHandler ¶ added in v0.5.0
BatchHandler processes multiple events at once.
type BatchProcessor ¶ added in v0.5.0
type BatchProcessor struct {
// contains filtered or unexported fields
}
BatchProcessor collects events and processes them in batches.
func NewBatchProcessor ¶ added in v0.5.0
func NewBatchProcessor(eventType EventType, config BatchConfig, handler BatchHandler) *BatchProcessor
NewBatchProcessor creates a new batch processor.
func (*BatchProcessor) Add ¶ added in v0.5.0
func (bp *BatchProcessor) Add(event *Event)
Add adds an event to the batch buffer.
func (*BatchProcessor) AsEventHandler ¶ added in v0.5.0
func (bp *BatchProcessor) AsEventHandler() EventHandler
AsEventHandler returns an EventHandler that adds events to the batch.
func (*BatchProcessor) Flush ¶ added in v0.5.0
func (bp *BatchProcessor) Flush()
Flush processes all buffered events immediately.
func (*BatchProcessor) Shutdown ¶ added in v0.5.0
func (bp *BatchProcessor) Shutdown()
Shutdown stops the batch processor and flushes remaining events.
type Config ¶
type Config struct {
Workers int // Number of worker goroutines
BufferSize int // Size of the event channel buffer
}
Config holds configuration for the event bus.
func DefaultConfig ¶
func DefaultConfig() *Config
DefaultConfig returns sensible default configuration.
type Event ¶
type Event struct {
Type EventType // Strongly-typed event identifier
Timestamp time.Time // When the event occurred
Data any // Event-specific payload
Metadata map[string]any // Additional context (e.g., request ID, user agent)
}
Event represents a generic event that flows through the system.
type EventBus ¶
type EventBus struct {
// contains filtered or unexported fields
}
EventBus manages event publishing and subscription.
func GetGlobalBus ¶
func GetGlobalBus() *EventBus
GetGlobalBus returns the singleton event bus instance. On the first call, it initializes the bus with the DefaultConfig configuration if it hasn't been initialized by InitGlobalBus already. Subsequent calls will return the same instance.
func NewEventBus ¶
NewEventBus creates a new event bus with the given configuration.
func (*EventBus) PublishSync ¶
PublishSync sends an event to all registered handlers synchronously.
func (*EventBus) Shutdown ¶
func (eb *EventBus) Shutdown()
Shutdown gracefully stops the event bus and waits for all workers to finish.
func (*EventBus) Subscribe ¶
func (eb *EventBus) Subscribe(eventType EventType, handler EventHandler) string
Subscribe registers a handler for a specific event type and returns a subscription ID.
func (*EventBus) Unsubscribe ¶
Unsubscribe removes a subscription by ID.
type EventHandler ¶
EventHandler is a function that processes events.
func NewFilteredHandler ¶ added in v0.4.0
func NewFilteredHandler(filter Filter, handler EventHandler) EventHandler
NewFilteredHandler creates a handler that only executes when the filter returns true.
type EventType ¶
type EventType string
EventType represents a strongly-typed event identifier to prevent typos and ensure consistency.
const ( // Authentication Events AuthEventUserCreated EventType = "auth.user.created" AuthEventLoginSuccess EventType = "auth.login.success" AuthEventLoginFailed EventType = "auth.login.failed" AuthEventLogout EventType = "auth.logout" AuthEventPasswordChanged EventType = "auth.password.changed" AuthEventTokenRevoked EventType = "auth.token.revoked" // User Management Events UserEventUpdated EventType = "user.updated" UserEventDeleted EventType = "user.deleted" UserEventEnabled EventType = "user.enabled" UserEventDisabled EventType = "user.disabled" // Organization Events OrgEventCreated EventType = "org.created" OrgEventUpdated EventType = "org.updated" OrgEventDeleted EventType = "org.deleted" // Permission Events PermissionEventGranted EventType = "permission.granted" PermissionEventRevoked EventType = "permission.revoked" )
Event type constants for different domains in the system. This is intended for example you can extend it as you want. Simply wrap your string in EventType.
type Filter ¶ added in v0.4.0
Filter determines if an event should be processed by a handler.
func FilterByMetadata ¶ added in v0.4.0
FilterByMetadata creates a filter that checks for specific metadata key-value pairs.
func FilterByMetadataExists ¶ added in v0.4.0
FilterByMetadataExists creates a filter that checks if metadata key exists.
type FilteredHandler ¶ added in v0.4.0
type FilteredHandler struct {
// contains filtered or unexported fields
}
FilteredHandler wraps a handler with a filter condition.
type LoginFailedData ¶
LoginFailedData contains information about a failed login attempt.
type LoginSuccessData ¶
type LoginSuccessData struct {
UserID string
Username string
Organization string
IPAddress string
UserAgent string
}
LoginSuccessData contains information about a successful login.
type Middleware ¶ added in v0.3.0
type Middleware func(EventHandler) EventHandler
Middleware wraps an EventHandler with additional behavior.
func Chain ¶ added in v0.3.0
func Chain(middlewares ...Middleware) Middleware
Chain chains multiple middlewares together.
func WithLogging ¶ added in v0.3.0
func WithLogging() Middleware
WithLogging logs handler execution.
func WithRecovery ¶ added in v0.3.0
func WithRecovery() Middleware
WithRecovery recovers from panics in handlers.
func WithRetry ¶ added in v0.3.0
func WithRetry(maxRetries int, initialDelay time.Duration) Middleware
WithRetry retries failed handlers with exponential backoff.
func WithTimeout ¶ added in v0.3.0
func WithTimeout(timeout time.Duration) Middleware
WithTimeout adds a timeout to handler execution.
type PriorityEvent ¶ added in v0.6.0
PriorityEvent wraps an event with priority information.
type PriorityQueue ¶ added in v0.6.0
type PriorityQueue struct {
// contains filtered or unexported fields
}
PriorityQueue manages priority-based event processing.
func NewPriorityQueue ¶ added in v0.6.0
func NewPriorityQueue() *PriorityQueue
NewPriorityQueue creates a new priority queue.
func (*PriorityQueue) Dequeue ¶ added in v0.6.0
func (pq *PriorityQueue) Dequeue() (*Event, bool)
Dequeue removes and returns the highest priority event.
func (*PriorityQueue) Enqueue ¶ added in v0.6.0
func (pq *PriorityQueue) Enqueue(event *Event, priority int)
Enqueue adds an event with the specified priority.
func (*PriorityQueue) Size ¶ added in v0.6.0
func (pq *PriorityQueue) Size() int
Size returns the number of events in the queue.
type Subscription ¶
type Subscription struct {
ID string
Type EventType
Handler EventHandler
}
Subscription represents a registered event handler.
type TokenRevokedData ¶
TokenRevokedData contains information about a revoked token.
type UserCreatedData ¶
UserCreatedData contains information about a newly created user.