Documentation
¶
Overview ¶
Package capitan provides type-safe event coordination for Go with zero dependencies.
At its core, capitan offers three operations: Emit events with typed fields, Hook listeners to specific signals, and Observe all signals. Events are processed asynchronously with per-signal worker goroutines for isolation and performance.
Context Support: All events carry a context.Context for cancellation, timeouts, and request-scoped values. Canceled contexts prevent event queueing and processing.
Quick example:
sig := capitan.NewSignal("order.created", "New order has been created")
orderID := capitan.NewStringKey("order_id")
capitan.Hook(sig, func(ctx context.Context, e *capitan.Event) {
id, _ := orderID.From(e)
// Process order...
})
capitan.Emit(context.Background(), sig, orderID.Field("ORDER-123"))
capitan.Shutdown() // Drain pending events
See https://github.com/zoobz-io/capitan for full documentation.
Index ¶
- func ApplyConfig(cfg Config) error
- func Configure(opts ...Option)
- func Debug(ctx context.Context, signal Signal, fields ...Field)
- func Emit(ctx context.Context, signal Signal, fields ...Field)
- func Error(ctx context.Context, signal Signal, fields ...Field)
- func Info(ctx context.Context, signal Signal, fields ...Field)
- func Replay(ctx context.Context, e *Event)
- func Shutdown()
- func Warn(ctx context.Context, signal Signal, fields ...Field)
- type BoolKey
- type BytesKey
- type Capitan
- func (c *Capitan) ApplyConfig(cfg Config) error
- func (c *Capitan) Debug(ctx context.Context, signal Signal, fields ...Field)
- func (c *Capitan) Drain(ctx context.Context) error
- func (c *Capitan) Emit(ctx context.Context, signal Signal, fields ...Field)
- func (c *Capitan) Error(ctx context.Context, signal Signal, fields ...Field)
- func (c *Capitan) Hook(signal Signal, callback EventCallback) *Listener
- func (c *Capitan) HookOnce(signal Signal, callback EventCallback) *Listener
- func (c *Capitan) Info(ctx context.Context, signal Signal, fields ...Field)
- func (c *Capitan) IsShutdown() bool
- func (c *Capitan) Observe(callback EventCallback, signals ...Signal) *Observer
- func (c *Capitan) Replay(ctx context.Context, e *Event)
- func (c *Capitan) Shutdown()
- func (c *Capitan) Stats() Stats
- func (c *Capitan) Warn(ctx context.Context, signal Signal, fields ...Field)
- type Config
- type ConfigError
- type DropPolicy
- type DurationKey
- type ErrorKey
- type Event
- type EventCallback
- type Field
- type Float32Key
- type Float64Key
- type GenericField
- type GenericKey
- type Int32Key
- type Int64Key
- type IntKey
- type Key
- type Listener
- type Observer
- type Option
- type PanicHandler
- type Severity
- type Signal
- type SignalConfig
- type Stats
- type StringKey
- type TimeKey
- type Uint32Key
- type Uint64Key
- type UintKey
- type Variant
Constants ¶
This section is empty.
Variables ¶
This section is empty.
Functions ¶
func ApplyConfig ¶
ApplyConfig applies configuration on the default instance.
func Configure ¶
func Configure(opts ...Option)
Configure sets options for the default Capitan instance. Must be called before any module-level functions (Hook, Emit, Observe, Shutdown). Subsequent calls have no effect once the default instance is created.
func Emit ¶
Emit dispatches an event with Info severity on the default instance.
Events are queued asynchronously and processed by per-signal worker goroutines. If no listeners are registered for the signal, the event is dropped silently. If the context is canceled, the event may be dropped.
Example:
orderCreated := capitan.NewSignal("order.created", "New order placed")
orderID := capitan.NewStringKey("order_id")
total := capitan.NewFloat64Key("total")
capitan.Emit(ctx, orderCreated,
orderID.Field("ORD-123"),
total.Field(99.99),
)
func Replay ¶
Replay re-emits a historical event on the default instance. The event is marked as a replay, preserving its original timestamp and severity. Useful for replaying events from storage for debugging or backfilling.
Example:
e := capitan.NewEvent(orderCreated, capitan.SeverityInfo, storedTimestamp,
orderID.Field("ORD-123"),
total.Field(99.99),
)
capitan.Replay(ctx, e)
Types ¶
type BoolKey ¶
type BoolKey = GenericKey[bool]
BoolKey is a Key implementation for bool values.
func NewBoolKey ¶
NewBoolKey creates a BoolKey with the given name.
type BytesKey ¶
type BytesKey = GenericKey[[]byte]
BytesKey is a Key implementation for []byte values.
func NewBytesKey ¶
NewBytesKey creates a BytesKey with the given name.
type Capitan ¶
type Capitan struct {
// contains filtered or unexported fields
}
Capitan is an event coordination system with per-signal worker goroutines.
Each signal gets its own worker goroutine created lazily on first emission, providing isolation between signals. Slow or panicking listeners on one signal do not affect other signals.
Use New to create isolated instances, or the module-level functions (Emit, Hook, Observe, Shutdown) to use the default singleton.
func New ¶
New creates a new Capitan instance with optional configuration. If no options are provided, sensible defaults are used (bufferSize=16, no panic handler).
func (*Capitan) ApplyConfig ¶
ApplyConfig applies per-signal configuration from a Config. Replaces the entire configuration - signals not in the new config revert to defaults. Supports exact signal names and glob patterns.
Resolution uses specificity rules:
- Exact matches always win over globs
- Among globs, longest pattern wins (more specific)
- Ties: alphabetical order (deterministic)
Only rebuilds workers whose resolved config actually changed.
Example:
cfg := capitan.Config{
Signals: map[string]capitan.SignalConfig{
"order.*": {BufferSize: 32, MinSeverity: capitan.SeverityInfo},
"order.created": {BufferSize: 64}, // Exact match overrides glob
},
}
cap.ApplyConfig(cfg)
func (*Capitan) Drain ¶
Drain blocks until all currently queued events have been processed. Unlike Shutdown, workers remain active after draining. Returns an error if the context is canceled before drain completes.
func (*Capitan) Emit ¶
Emit dispatches an event with Info severity (default). Queues the event for asynchronous processing by the signal's worker goroutine. Creates a worker goroutine lazily on first emission to this signal. Silently drops events if no listeners are registered for the signal. If the context is canceled before the event can be queued, the event is dropped.
func (*Capitan) Hook ¶
func (c *Capitan) Hook(signal Signal, callback EventCallback) *Listener
Hook registers a callback for the given signal. Returns a Listener that can be closed to unregister. Returns nil if MaxListeners is configured and the limit is reached.
func (*Capitan) HookOnce ¶
func (c *Capitan) HookOnce(signal Signal, callback EventCallback) *Listener
HookOnce registers a callback that fires only once, then automatically unregisters. Returns a Listener that can be closed early to prevent the callback from firing.
func (*Capitan) IsShutdown ¶
IsShutdown reports whether Shutdown has been called on this instance.
func (*Capitan) Observe ¶
func (c *Capitan) Observe(callback EventCallback, signals ...Signal) *Observer
Observe registers a callback for all signals (dynamic). If signals are provided, only those signals will be observed (whitelist). If no signals are provided, all signals will be observed. The observer will receive events from both existing and future signals. Returns an Observer that can be closed to unregister all listeners.
func (*Capitan) Replay ¶
Replay re-emits a historical event, preserving its original timestamp and severity. The event is marked as a replay, accessible via Event.IsReplay(). Replay events are processed synchronously and are not pooled.
Use NewEvent to construct events from stored data:
e := capitan.NewEvent(signal, severity, timestamp, fields...) c.Replay(ctx, e)
func (*Capitan) Shutdown ¶
func (c *Capitan) Shutdown()
Shutdown gracefully stops all worker goroutines, draining pending events. Safe to call multiple times; subsequent calls are no-ops.
type Config ¶
type Config struct {
// Signals maps signal names or glob patterns to their configuration.
// Glob patterns use path.Match syntax: *, ?, [...]
Signals map[string]SignalConfig `json:"signals"`
}
Config is the serializable configuration for per-signal settings. Supports exact signal names and glob patterns (e.g., "order.*").
type ConfigError ¶
ConfigError represents a validation error in the configuration.
func (*ConfigError) Error ¶
func (e *ConfigError) Error() string
type DropPolicy ¶
type DropPolicy string
DropPolicy defines behavior when the event buffer is full.
const ( // DropPolicyBlock waits for space in the buffer (default behavior). DropPolicyBlock DropPolicy = "block" // DropPolicyDropNewest drops incoming events if the buffer is full. DropPolicyDropNewest DropPolicy = "drop_newest" )
type DurationKey ¶
type DurationKey = GenericKey[time.Duration]
DurationKey is a Key implementation for time.Duration values.
func NewDurationKey ¶
func NewDurationKey(name string) DurationKey
NewDurationKey creates a DurationKey with the given name.
type ErrorKey ¶
type ErrorKey = GenericKey[error]
ErrorKey is a Key implementation for error values.
func NewErrorKey ¶
NewErrorKey creates an ErrorKey with the given name.
type Event ¶
type Event struct {
// contains filtered or unexported fields
}
Event represents a signal emission with typed fields. Events are immutable after creation - all fields are read-only.
func NewEvent ¶
NewEvent creates an Event for replay purposes. Unlike internally emitted events, this event is not pooled. Use this to construct events from stored data for replay.
Example:
e := capitan.NewEvent(orderCreated, capitan.SeverityInfo, storedTimestamp,
orderID.Field("ORD-123"),
total.Field(99.99),
)
capitan.Replay(ctx, e)
func (*Event) Clone ¶
Clone creates a deep copy of the event that is safe to retain. Use this when you need to store or pass an event beyond the listener callback, as pooled events are recycled after all listeners complete.
func (*Event) Context ¶
Context returns the context passed at emission time. Used for cancellation checks, timeouts, and request-scoped values.
func (Event) Fields ¶
Fields returns all fields as a slice. Returns a defensive copy; modifications don't affect the event.
type EventCallback ¶
EventCallback is a function that handles an Event. The context is inherited from the Emit call and can be used for cancellation, timeouts, and accessing request-scoped values. Handlers are responsible for their own error handling and logging. Events must not be modified by listeners.
type Field ¶
type Field interface {
// Variant returns the discriminator for this field's concrete type.
Variant() Variant
// Key returns the semantic identifier for this field.
Key() Key
// Value returns the underlying value as any.
Value() any
}
Field represents a typed value with semantic meaning in an Event. Library authors can implement custom Field types while maintaining type safety. Use type assertions to access concrete field types and their typed accessor methods.
type Float32Key ¶
type Float32Key = GenericKey[float32]
Float32Key is a Key implementation for float32 values.
func NewFloat32Key ¶
func NewFloat32Key(name string) Float32Key
NewFloat32Key creates a Float32Key with the given name.
type Float64Key ¶
type Float64Key = GenericKey[float64]
Float64Key is a Key implementation for float64 values.
func NewFloat64Key ¶
func NewFloat64Key(name string) Float64Key
NewFloat64Key creates a Float64Key with the given name.
type GenericField ¶
type GenericField[T any] struct { // contains filtered or unexported fields }
GenericField is a generic implementation of Field for typed values.
func (GenericField[T]) Key ¶
func (f GenericField[T]) Key() Key
Key returns the semantic identifier for this field.
func (GenericField[T]) Value ¶
func (f GenericField[T]) Value() any
Value returns the underlying value as any.
func (GenericField[T]) Variant ¶
func (f GenericField[T]) Variant() Variant
Variant returns the discriminator for this field's type.
type GenericKey ¶
type GenericKey[T any] struct { // contains filtered or unexported fields }
GenericKey is a Key implementation for any type T. All built-in key types (StringKey, IntKey, etc.) are aliases of GenericKey[T].
func NewKey ¶
func NewKey[T any](name string, variant Variant) GenericKey[T]
NewKey creates a GenericKey for any type T with the given name and variant. Use a namespaced variant string to avoid collisions (e.g., "myapp.OrderInfo").
Example:
type OrderInfo struct { ID string; Total float64 }
orderKey := capitan.NewKey[OrderInfo]("order", "myapp.OrderInfo")
capitan.Emit(sig, orderKey.Field(OrderInfo{ID: "123", Total: 99.99}))
func (GenericKey[T]) ExtractFromFields ¶
func (k GenericKey[T]) ExtractFromFields(fields []Field) T
ExtractFromFields extracts the typed value for this key from a field slice. Returns the value if present, or the zero value if not found or wrong type.
This method is primarily useful in tests when working with captured events, where you have a []Field slice rather than an *Event pointer.
Example:
capture := capitantesting.NewEventCapture() c.Hook(signal, capture.Handler()) // ... emit events ... events := capture.Events() userID := userKey.ExtractFromFields(events[0].Fields)
func (GenericKey[T]) Field ¶
func (k GenericKey[T]) Field(value T) Field
Field creates a GenericField with this key and the given value.
func (GenericKey[T]) From ¶
func (k GenericKey[T]) From(e *Event) (T, bool)
From extracts the typed value for this key from the event. Returns the value and true if present, or zero value and false if not present or wrong type.
func (GenericKey[T]) Name ¶
func (k GenericKey[T]) Name() string
Name returns the semantic identifier.
func (GenericKey[T]) Variant ¶
func (k GenericKey[T]) Variant() Variant
Variant returns the type constraint.
type Int32Key ¶
type Int32Key = GenericKey[int32]
Int32Key is a Key implementation for int32 values.
func NewInt32Key ¶
NewInt32Key creates an Int32Key with the given name.
type Int64Key ¶
type Int64Key = GenericKey[int64]
Int64Key is a Key implementation for int64 values.
func NewInt64Key ¶
NewInt64Key creates an Int64Key with the given name.
type Key ¶
type Key interface {
// Name returns the semantic identifier for this key.
Name() string
// Variant returns the type constraint for this key.
Variant() Variant
}
Key represents a typed semantic identifier for a field. Each Key implementation is bound to a specific Variant, ensuring type safety.
type Listener ¶
type Listener struct {
// contains filtered or unexported fields
}
Listener represents an active subscription to a signal. Call Close() to unregister the listener and prevent further callbacks.
func Hook ¶
func Hook(signal Signal, callback EventCallback) *Listener
Hook registers a callback for the given signal on the default instance. Returns a Listener that can be closed to unregister.
Example:
orderCreated := capitan.NewSignal("order.created", "New order placed")
orderID := capitan.NewStringKey("order_id")
listener := capitan.Hook(orderCreated, func(ctx context.Context, e *capitan.Event) {
id, _ := orderID.From(e)
fmt.Printf("Order %s created\n", id)
})
defer listener.Close()
func HookOnce ¶
func HookOnce(signal Signal, callback EventCallback) *Listener
HookOnce registers a callback that fires only once, then automatically unregisters. Returns a Listener that can be closed early to prevent the callback from firing.
Example:
listener := capitan.HookOnce(orderCreated, func(ctx context.Context, e *capitan.Event) {
// This handler runs at most once
fmt.Println("First order received!")
})
type Observer ¶
type Observer struct {
// contains filtered or unexported fields
}
Observer represents a dynamic subscription that receives events from multiple signals.
Unlike Listener which subscribes to a single signal, Observer can watch all signals or a whitelist of specific signals. Observers automatically attach to new signals as they are created, making them suitable for cross-cutting concerns like logging.
Call Close to unregister all underlying listeners and stop receiving events.
func Observe ¶
func Observe(callback EventCallback, signals ...Signal) *Observer
Observe registers a callback for all signals on the default instance.
If signals are provided, only those signals will be observed (whitelist mode). If no signals are provided, all signals will be observed including future ones.
Returns an Observer that can be closed to unregister all listeners.
Example (logging all events):
observer := capitan.Observe(func(ctx context.Context, e *capitan.Event) {
log.Printf("[%s] %s: %s",
e.Severity(),
e.Signal().Name(),
e.Signal().Description())
})
defer observer.Close()
Example (whitelist specific signals):
observer := capitan.Observe(handler, orderCreated, orderShipped, orderCanceled)
type Option ¶
type Option func(*Capitan)
Option configures a Capitan instance.
func WithBufferSize ¶
WithBufferSize sets the event queue buffer size for each signal's worker. Default is 16. Larger buffers reduce backpressure but increase memory usage.
func WithPanicHandler ¶
func WithPanicHandler(handler PanicHandler) Option
WithPanicHandler sets a callback to be invoked when a listener panics. The handler receives the signal and the recovered panic value. By default, panics are recovered silently to prevent system crashes.
func WithSyncMode ¶
func WithSyncMode() Option
WithSyncMode enables synchronous event processing for testing. When enabled, Emit() calls listeners directly instead of queueing to workers. This eliminates timing dependencies and makes tests deterministic. Should only be used in tests, not production code.
type PanicHandler ¶
PanicHandler is called when a listener panics during event processing. Receives the signal being processed and the recovered panic value.
type Severity ¶
type Severity string
Severity represents the logging severity level of an event. Compatible with standard logging conventions and OpenTelemetry.
const ( // SeverityDebug is for development and troubleshooting information. SeverityDebug Severity = "DEBUG" // SeverityInfo is for normal operational messages (default for Emit). SeverityInfo Severity = "INFO" // SeverityWarn is for warning conditions that may need attention. SeverityWarn Severity = "WARN" // SeverityError is for error conditions requiring immediate action. SeverityError Severity = "ERROR" )
Severity levels for events.
type Signal ¶
type Signal struct {
// contains filtered or unexported fields
}
Signal represents an event type identifier used for routing events to listeners.
func NewSignal ¶
NewSignal creates a new Signal with the given name and description. The description is used as the human-readable message when converting to logs.
func (Signal) Description ¶
Description returns the signal's human-readable description.
type SignalConfig ¶
type SignalConfig struct {
// BufferSize sets the event queue size for this signal.
// Default: instance-level bufferSize (typically 16).
BufferSize int `json:"bufferSize,omitempty"`
// Disabled suppresses all events for this signal when true.
Disabled bool `json:"disabled,omitempty"`
// MinSeverity filters events below this severity level.
// Events with severity < MinSeverity are silently dropped.
MinSeverity Severity `json:"minSeverity,omitempty"`
// MaxListeners caps the number of listeners for this signal.
// Hook() returns nil when the limit is reached. 0 = unlimited.
MaxListeners int `json:"maxListeners,omitempty"`
// DropPolicy controls behavior when the event buffer is full.
// Default: DropPolicyBlock (wait for space).
DropPolicy DropPolicy `json:"dropPolicy,omitempty"`
// RateLimit sets the maximum events per second for this signal.
// Events exceeding the rate are silently dropped. 0 = unlimited.
RateLimit float64 `json:"rateLimit,omitempty"`
// BurstSize sets the burst allowance above the rate limit.
// Allows short bursts before rate limiting kicks in.
BurstSize int `json:"burstSize,omitempty"`
}
SignalConfig holds per-signal configuration options. Zero values indicate "use default" for all fields.
type Stats ¶
type Stats struct {
// ActiveWorkers is the number of worker goroutines currently running.
ActiveWorkers int
// SignalCount is the total number of unique signals that have been registered.
SignalCount int
// DroppedEvents is the total number of events dropped due to no listeners.
DroppedEvents uint64
// QueueDepths maps each signal to the number of events queued in its buffer.
QueueDepths map[Signal]int
// ListenerCounts maps each signal to the number of registered listeners.
ListenerCounts map[Signal]int
// EmitCounts maps each signal to the total number of times it has been emitted.
EmitCounts map[Signal]uint64
// FieldSchemas maps each signal to the keys of fields from its first emission.
// This provides a schema of what fields are available on each signal.
FieldSchemas map[Signal][]Key
}
Stats provides runtime metrics for a Capitan instance.
type StringKey ¶
type StringKey = GenericKey[string]
StringKey is a Key implementation for string values.
func NewStringKey ¶
NewStringKey creates a StringKey with the given name.
type TimeKey ¶
type TimeKey = GenericKey[time.Time]
TimeKey is a Key implementation for time.Time values.
func NewTimeKey ¶
NewTimeKey creates a TimeKey with the given name.
type Uint32Key ¶
type Uint32Key = GenericKey[uint32]
Uint32Key is a Key implementation for uint32 values.
func NewUint32Key ¶
NewUint32Key creates a Uint32Key with the given name.
type Uint64Key ¶
type Uint64Key = GenericKey[uint64]
Uint64Key is a Key implementation for uint64 values.
func NewUint64Key ¶
NewUint64Key creates a Uint64Key with the given name.
type UintKey ¶
type UintKey = GenericKey[uint]
UintKey is a Key implementation for uint values.
func NewUintKey ¶
NewUintKey creates a UintKey with the given name.
type Variant ¶
type Variant string
Variant is a discriminator for the Field interface implementation type. Used for runtime type identification when type assertions are needed.
const ( VariantString Variant = "string" VariantInt Variant = "int" VariantInt32 Variant = "int32" VariantInt64 Variant = "int64" VariantUint Variant = "uint" VariantUint32 Variant = "uint32" VariantUint64 Variant = "uint64" VariantFloat32 Variant = "float32" VariantFloat64 Variant = "float64" VariantBool Variant = "bool" VariantTime Variant = "time.Time" VariantDuration Variant = "time.Duration" VariantBytes Variant = "[]byte" VariantError Variant = "error" )
Built-in variants for primitive types. Custom types should use namespaced variant strings (e.g., "myapp.OrderInfo") to avoid collisions.