capitan

package module
v1.0.2 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Mar 19, 2026 License: MIT Imports: 4 Imported by: 11

README

capitan

CI Status codecov Go Report Card CodeQL Go Reference License Go Version Release

Type-safe event coordination for Go with zero dependencies.

Emit events with typed fields, hook listeners, and let capitan handle the rest with async processing and backpressure.

Send a Signal, Listen Anywhere

Events carry typed fields with compile-time safety.

// Define typed keys
orderID := capitan.NewStringKey("order_id")
total   := capitan.NewFloat64Key("total")

// Define a signal
orderCreated := capitan.NewSignal("order.created", "New order placed")

// Emit with typed fields
capitan.Emit(ctx, orderCreated,
    orderID.Field("ORD-123"),
    total.Field(99.99),
)

Each signal queues to its own worker — isolated, async, backpressure-aware.

// Hook a listener — extract typed values directly
capitan.Hook(orderCreated, func(ctx context.Context, e *capitan.Event) {
    id, _ := orderID.From(e)      // string
    amount, _ := total.From(e)    // float64
    process(id, amount)
})

// Observe all signals — unified visibility across the system
capitan.Observe(func(ctx context.Context, e *capitan.Event) {
    log.Info("event", "signal", e.Signal().Name())
})

Type-safe at the edges. Async and isolated in between.

Installation

go get github.com/zoobz-io/capitan

Requires Go 1.24+.

Quick Start

package main

import (
    "context"
    "fmt"
    "github.com/zoobz-io/capitan"
)

// Define signals and keys as package-level variables
var (
    orderCreated = capitan.NewSignal("order.created", "New order placed")
    orderID      = capitan.NewStringKey("order_id")
    total        = capitan.NewFloat64Key("total")
)

func main() {
    // Hook a listener
    capitan.Hook(orderCreated, func(ctx context.Context, e *capitan.Event) {
        id, _ := orderID.From(e)
        amount, _ := total.From(e)
        fmt.Printf("Order %s: $%.2f\n", id, amount)
    })

    // Emit an event (async with backpressure)
    capitan.Emit(context.Background(), orderCreated,
        orderID.Field("ORDER-123"),
        total.Field(99.99),
    )

    // Gracefully drain pending events
    capitan.Shutdown()
}

Capabilities

Feature Description Docs
Typed Fields Built-in keys for primitives; NewKey[T] for any custom type Fields
Per-Signal Workers Each signal gets its own goroutine and buffered queue Architecture
Observers Cross-cutting handlers that see all signals (or a whitelist) Concepts
Configuration Buffer sizes, rate limits, drop policies, panic handlers Configuration
Graceful Shutdown Drain pending events before exit Best Practices
Testing Utilities Sync mode, event capture, isolated instances Testing

Why capitan?

  • Type-safe — Typed fields with compile-time safety, including custom structs
  • Zero dependencies — Standard library only
  • Async by default — Per-signal workers with backpressure
  • Isolated — Slow listeners don't affect other signals
  • Panic-safe — Listener panics recovered, system stays running
  • Testable — Sync mode and capture utilities for deterministic tests

Decoupled Coordination

Capitan enables a pattern: packages emit, concerned parties listen, services observe.

Your domain packages emit events when meaningful things happen. Other packages hook the signals they care about. Service-level concerns — audit trails, structured logging, metrics collection — observe everything through a unified stream.

// In your order package
capitan.Emit(ctx, orderCreated, orderID.Field(id), total.Field(amount))

// In your notification package
capitan.Hook(orderCreated, sendConfirmationEmail)

// In your audit service
capitan.Observe(writeToAuditLog)

Three packages, one event flow, zero direct imports between them.

Documentation

Full documentation is available in the docs/ directory:

Learn
Guides
  • Configuration — Buffer sizes, panic handlers, runtime metrics
  • Context — Request tracing, cancellation, timeouts
  • Errors — Error propagation, severity levels, retry patterns
  • Testing — Sync mode, event capture, isolated instances
  • Best Practices — Signal design, listener lifecycle, performance
  • Persistence — Event storage and replay
Integrations
  • aperture — OpenTelemetry observability
  • herald — Message broker bridge (Kafka, NATS)
  • ago — Workflow orchestration (Experimental)
Reference

Contributing

See CONTRIBUTING.md for guidelines. Run make help for available commands.

License

MIT License — see LICENSE for details.

Documentation

Overview

Package capitan provides type-safe event coordination for Go with zero dependencies.

At its core, capitan offers three operations: Emit events with typed fields, Hook listeners to specific signals, and Observe all signals. Events are processed asynchronously with per-signal worker goroutines for isolation and performance.

Context Support: All events carry a context.Context for cancellation, timeouts, and request-scoped values. Canceled contexts prevent event queueing and processing.

Quick example:

sig := capitan.NewSignal("order.created", "New order has been created")
orderID := capitan.NewStringKey("order_id")

capitan.Hook(sig, func(ctx context.Context, e *capitan.Event) {
    id, _ := orderID.From(e)
    // Process order...
})

capitan.Emit(context.Background(), sig, orderID.Field("ORDER-123"))
capitan.Shutdown() // Drain pending events

See https://github.com/zoobz-io/capitan for full documentation.

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

func ApplyConfig

func ApplyConfig(cfg Config) error

ApplyConfig applies configuration on the default instance.

func Configure

func Configure(opts ...Option)

Configure sets options for the default Capitan instance. Must be called before any module-level functions (Hook, Emit, Observe, Shutdown). Subsequent calls have no effect once the default instance is created.

func Debug

func Debug(ctx context.Context, signal Signal, fields ...Field)

Debug dispatches an event with Debug severity on the default instance.

func Emit

func Emit(ctx context.Context, signal Signal, fields ...Field)

Emit dispatches an event with Info severity on the default instance.

Events are queued asynchronously and processed by per-signal worker goroutines. If no listeners are registered for the signal, the event is dropped silently. If the context is canceled, the event may be dropped.

Example:

orderCreated := capitan.NewSignal("order.created", "New order placed")
orderID := capitan.NewStringKey("order_id")
total := capitan.NewFloat64Key("total")

capitan.Emit(ctx, orderCreated,
    orderID.Field("ORD-123"),
    total.Field(99.99),
)

func Error

func Error(ctx context.Context, signal Signal, fields ...Field)

Error dispatches an event with Error severity on the default instance.

func Info

func Info(ctx context.Context, signal Signal, fields ...Field)

Info dispatches an event with Info severity on the default instance.

func Replay

func Replay(ctx context.Context, e *Event)

Replay re-emits a historical event on the default instance. The event is marked as a replay, preserving its original timestamp and severity. Useful for replaying events from storage for debugging or backfilling.

Example:

e := capitan.NewEvent(orderCreated, capitan.SeverityInfo, storedTimestamp,
    orderID.Field("ORD-123"),
    total.Field(99.99),
)
capitan.Replay(ctx, e)

func Shutdown

func Shutdown()

Shutdown gracefully stops all worker goroutines on the default instance.

func Warn

func Warn(ctx context.Context, signal Signal, fields ...Field)

Warn dispatches an event with Warn severity on the default instance.

Types

type BoolKey

type BoolKey = GenericKey[bool]

BoolKey is a Key implementation for bool values.

func NewBoolKey

func NewBoolKey(name string) BoolKey

NewBoolKey creates a BoolKey with the given name.

type BytesKey

type BytesKey = GenericKey[[]byte]

BytesKey is a Key implementation for []byte values.

func NewBytesKey

func NewBytesKey(name string) BytesKey

NewBytesKey creates a BytesKey with the given name.

type Capitan

type Capitan struct {
	// contains filtered or unexported fields
}

Capitan is an event coordination system with per-signal worker goroutines.

Each signal gets its own worker goroutine created lazily on first emission, providing isolation between signals. Slow or panicking listeners on one signal do not affect other signals.

Use New to create isolated instances, or the module-level functions (Emit, Hook, Observe, Shutdown) to use the default singleton.

func Default

func Default() *Capitan

Default returns the default Capitan instance.

func New

func New(opts ...Option) *Capitan

New creates a new Capitan instance with optional configuration. If no options are provided, sensible defaults are used (bufferSize=16, no panic handler).

func (*Capitan) ApplyConfig

func (c *Capitan) ApplyConfig(cfg Config) error

ApplyConfig applies per-signal configuration from a Config. Replaces the entire configuration - signals not in the new config revert to defaults. Supports exact signal names and glob patterns.

Resolution uses specificity rules:

  • Exact matches always win over globs
  • Among globs, longest pattern wins (more specific)
  • Ties: alphabetical order (deterministic)

Only rebuilds workers whose resolved config actually changed.

Example:

cfg := capitan.Config{
    Signals: map[string]capitan.SignalConfig{
        "order.*":        {BufferSize: 32, MinSeverity: capitan.SeverityInfo},
        "order.created":  {BufferSize: 64},  // Exact match overrides glob
    },
}
cap.ApplyConfig(cfg)

func (*Capitan) Debug

func (c *Capitan) Debug(ctx context.Context, signal Signal, fields ...Field)

Debug dispatches an event with Debug severity.

func (*Capitan) Drain

func (c *Capitan) Drain(ctx context.Context) error

Drain blocks until all currently queued events have been processed. Unlike Shutdown, workers remain active after draining. Returns an error if the context is canceled before drain completes.

func (*Capitan) Emit

func (c *Capitan) Emit(ctx context.Context, signal Signal, fields ...Field)

Emit dispatches an event with Info severity (default). Queues the event for asynchronous processing by the signal's worker goroutine. Creates a worker goroutine lazily on first emission to this signal. Silently drops events if no listeners are registered for the signal. If the context is canceled before the event can be queued, the event is dropped.

func (*Capitan) Error

func (c *Capitan) Error(ctx context.Context, signal Signal, fields ...Field)

Error dispatches an event with Error severity.

func (*Capitan) Hook

func (c *Capitan) Hook(signal Signal, callback EventCallback) *Listener

Hook registers a callback for the given signal. Returns a Listener that can be closed to unregister. Returns nil if MaxListeners is configured and the limit is reached.

func (*Capitan) HookOnce

func (c *Capitan) HookOnce(signal Signal, callback EventCallback) *Listener

HookOnce registers a callback that fires only once, then automatically unregisters. Returns a Listener that can be closed early to prevent the callback from firing.

func (*Capitan) Info

func (c *Capitan) Info(ctx context.Context, signal Signal, fields ...Field)

Info dispatches an event with Info severity.

func (*Capitan) IsShutdown

func (c *Capitan) IsShutdown() bool

IsShutdown reports whether Shutdown has been called on this instance.

func (*Capitan) Observe

func (c *Capitan) Observe(callback EventCallback, signals ...Signal) *Observer

Observe registers a callback for all signals (dynamic). If signals are provided, only those signals will be observed (whitelist). If no signals are provided, all signals will be observed. The observer will receive events from both existing and future signals. Returns an Observer that can be closed to unregister all listeners.

func (*Capitan) Replay

func (c *Capitan) Replay(ctx context.Context, e *Event)

Replay re-emits a historical event, preserving its original timestamp and severity. The event is marked as a replay, accessible via Event.IsReplay(). Replay events are processed synchronously and are not pooled.

Use NewEvent to construct events from stored data:

e := capitan.NewEvent(signal, severity, timestamp, fields...)
c.Replay(ctx, e)

func (*Capitan) Shutdown

func (c *Capitan) Shutdown()

Shutdown gracefully stops all worker goroutines, draining pending events. Safe to call multiple times; subsequent calls are no-ops.

func (*Capitan) Stats

func (c *Capitan) Stats() Stats

Stats returns runtime metrics for the Capitan instance. Provides visibility into active workers, queue depths, listener counts, emit counts, dropped events, and field schemas.

func (*Capitan) Warn

func (c *Capitan) Warn(ctx context.Context, signal Signal, fields ...Field)

Warn dispatches an event with Warn severity.

type Config

type Config struct {
	// Signals maps signal names or glob patterns to their configuration.
	// Glob patterns use path.Match syntax: *, ?, [...]
	Signals map[string]SignalConfig `json:"signals"`
}

Config is the serializable configuration for per-signal settings. Supports exact signal names and glob patterns (e.g., "order.*").

func (Config) Validate

func (c Config) Validate() error

Validate checks the configuration for errors. Implements the Validator interface for flux compatibility.

type ConfigError

type ConfigError struct {
	Pattern string
	Field   string
	Reason  string
}

ConfigError represents a validation error in the configuration.

func (*ConfigError) Error

func (e *ConfigError) Error() string

type DropPolicy

type DropPolicy string

DropPolicy defines behavior when the event buffer is full.

const (
	// DropPolicyBlock waits for space in the buffer (default behavior).
	DropPolicyBlock DropPolicy = "block"
	// DropPolicyDropNewest drops incoming events if the buffer is full.
	DropPolicyDropNewest DropPolicy = "drop_newest"
)

type DurationKey

type DurationKey = GenericKey[time.Duration]

DurationKey is a Key implementation for time.Duration values.

func NewDurationKey

func NewDurationKey(name string) DurationKey

NewDurationKey creates a DurationKey with the given name.

type ErrorKey

type ErrorKey = GenericKey[error]

ErrorKey is a Key implementation for error values.

func NewErrorKey

func NewErrorKey(name string) ErrorKey

NewErrorKey creates an ErrorKey with the given name.

type Event

type Event struct {
	// contains filtered or unexported fields
}

Event represents a signal emission with typed fields. Events are immutable after creation - all fields are read-only.

func NewEvent

func NewEvent(signal Signal, severity Severity, timestamp time.Time, fields ...Field) *Event

NewEvent creates an Event for replay purposes. Unlike internally emitted events, this event is not pooled. Use this to construct events from stored data for replay.

Example:

e := capitan.NewEvent(orderCreated, capitan.SeverityInfo, storedTimestamp,
    orderID.Field("ORD-123"),
    total.Field(99.99),
)
capitan.Replay(ctx, e)

func (*Event) Clone

func (e *Event) Clone() *Event

Clone creates a deep copy of the event that is safe to retain. Use this when you need to store or pass an event beyond the listener callback, as pooled events are recycled after all listeners complete.

func (*Event) Context

func (e *Event) Context() context.Context

Context returns the context passed at emission time. Used for cancellation checks, timeouts, and request-scoped values.

func (Event) Fields

func (e Event) Fields() []Field

Fields returns all fields as a slice. Returns a defensive copy; modifications don't affect the event.

func (Event) Get

func (e Event) Get(key Key) Field

Get retrieves a field by key, returning nil if not found.

func (*Event) IsReplay

func (e *Event) IsReplay() bool

IsReplay returns true if this event was replayed from storage.

func (*Event) Severity

func (e *Event) Severity() Severity

Severity returns the event's severity level.

func (*Event) Signal

func (e *Event) Signal() Signal

Signal returns the event's signal identifier.

func (*Event) Timestamp

func (e *Event) Timestamp() time.Time

Timestamp returns when the event was created.

type EventCallback

type EventCallback func(context.Context, *Event)

EventCallback is a function that handles an Event. The context is inherited from the Emit call and can be used for cancellation, timeouts, and accessing request-scoped values. Handlers are responsible for their own error handling and logging. Events must not be modified by listeners.

type Field

type Field interface {
	// Variant returns the discriminator for this field's concrete type.
	Variant() Variant

	// Key returns the semantic identifier for this field.
	Key() Key

	// Value returns the underlying value as any.
	Value() any
}

Field represents a typed value with semantic meaning in an Event. Library authors can implement custom Field types while maintaining type safety. Use type assertions to access concrete field types and their typed accessor methods.

type Float32Key

type Float32Key = GenericKey[float32]

Float32Key is a Key implementation for float32 values.

func NewFloat32Key

func NewFloat32Key(name string) Float32Key

NewFloat32Key creates a Float32Key with the given name.

type Float64Key

type Float64Key = GenericKey[float64]

Float64Key is a Key implementation for float64 values.

func NewFloat64Key

func NewFloat64Key(name string) Float64Key

NewFloat64Key creates a Float64Key with the given name.

type GenericField

type GenericField[T any] struct {
	// contains filtered or unexported fields
}

GenericField is a generic implementation of Field for typed values.

func (GenericField[T]) Get

func (f GenericField[T]) Get() T

Get returns the typed value.

func (GenericField[T]) Key

func (f GenericField[T]) Key() Key

Key returns the semantic identifier for this field.

func (GenericField[T]) Value

func (f GenericField[T]) Value() any

Value returns the underlying value as any.

func (GenericField[T]) Variant

func (f GenericField[T]) Variant() Variant

Variant returns the discriminator for this field's type.

type GenericKey

type GenericKey[T any] struct {
	// contains filtered or unexported fields
}

GenericKey is a Key implementation for any type T. All built-in key types (StringKey, IntKey, etc.) are aliases of GenericKey[T].

func NewKey

func NewKey[T any](name string, variant Variant) GenericKey[T]

NewKey creates a GenericKey for any type T with the given name and variant. Use a namespaced variant string to avoid collisions (e.g., "myapp.OrderInfo").

Example:

type OrderInfo struct { ID string; Total float64 }
orderKey := capitan.NewKey[OrderInfo]("order", "myapp.OrderInfo")
capitan.Emit(sig, orderKey.Field(OrderInfo{ID: "123", Total: 99.99}))

func (GenericKey[T]) ExtractFromFields

func (k GenericKey[T]) ExtractFromFields(fields []Field) T

ExtractFromFields extracts the typed value for this key from a field slice. Returns the value if present, or the zero value if not found or wrong type.

This method is primarily useful in tests when working with captured events, where you have a []Field slice rather than an *Event pointer.

Example:

capture := capitantesting.NewEventCapture()
c.Hook(signal, capture.Handler())
// ... emit events ...
events := capture.Events()
userID := userKey.ExtractFromFields(events[0].Fields)

func (GenericKey[T]) Field

func (k GenericKey[T]) Field(value T) Field

Field creates a GenericField with this key and the given value.

func (GenericKey[T]) From

func (k GenericKey[T]) From(e *Event) (T, bool)

From extracts the typed value for this key from the event. Returns the value and true if present, or zero value and false if not present or wrong type.

func (GenericKey[T]) Name

func (k GenericKey[T]) Name() string

Name returns the semantic identifier.

func (GenericKey[T]) Variant

func (k GenericKey[T]) Variant() Variant

Variant returns the type constraint.

type Int32Key

type Int32Key = GenericKey[int32]

Int32Key is a Key implementation for int32 values.

func NewInt32Key

func NewInt32Key(name string) Int32Key

NewInt32Key creates an Int32Key with the given name.

type Int64Key

type Int64Key = GenericKey[int64]

Int64Key is a Key implementation for int64 values.

func NewInt64Key

func NewInt64Key(name string) Int64Key

NewInt64Key creates an Int64Key with the given name.

type IntKey

type IntKey = GenericKey[int]

IntKey is a Key implementation for int values.

func NewIntKey

func NewIntKey(name string) IntKey

NewIntKey creates an IntKey with the given name.

type Key

type Key interface {
	// Name returns the semantic identifier for this key.
	Name() string

	// Variant returns the type constraint for this key.
	Variant() Variant
}

Key represents a typed semantic identifier for a field. Each Key implementation is bound to a specific Variant, ensuring type safety.

type Listener

type Listener struct {
	// contains filtered or unexported fields
}

Listener represents an active subscription to a signal. Call Close() to unregister the listener and prevent further callbacks.

func Hook

func Hook(signal Signal, callback EventCallback) *Listener

Hook registers a callback for the given signal on the default instance. Returns a Listener that can be closed to unregister.

Example:

orderCreated := capitan.NewSignal("order.created", "New order placed")
orderID := capitan.NewStringKey("order_id")

listener := capitan.Hook(orderCreated, func(ctx context.Context, e *capitan.Event) {
    id, _ := orderID.From(e)
    fmt.Printf("Order %s created\n", id)
})
defer listener.Close()

func HookOnce

func HookOnce(signal Signal, callback EventCallback) *Listener

HookOnce registers a callback that fires only once, then automatically unregisters. Returns a Listener that can be closed early to prevent the callback from firing.

Example:

listener := capitan.HookOnce(orderCreated, func(ctx context.Context, e *capitan.Event) {
    // This handler runs at most once
    fmt.Println("First order received!")
})

func (*Listener) Close

func (l *Listener) Close()

Close removes this listener from the registry, preventing future callbacks. Blocks until all events queued before Close was called have been processed.

func (*Listener) Drain

func (l *Listener) Drain(ctx context.Context) error

Drain blocks until all events queued before Drain was called have been processed. Unlike Close, the listener remains active after draining. Returns an error if the context is canceled before drain completes.

type Observer

type Observer struct {
	// contains filtered or unexported fields
}

Observer represents a dynamic subscription that receives events from multiple signals.

Unlike Listener which subscribes to a single signal, Observer can watch all signals or a whitelist of specific signals. Observers automatically attach to new signals as they are created, making them suitable for cross-cutting concerns like logging.

Call Close to unregister all underlying listeners and stop receiving events.

func Observe

func Observe(callback EventCallback, signals ...Signal) *Observer

Observe registers a callback for all signals on the default instance.

If signals are provided, only those signals will be observed (whitelist mode). If no signals are provided, all signals will be observed including future ones.

Returns an Observer that can be closed to unregister all listeners.

Example (logging all events):

observer := capitan.Observe(func(ctx context.Context, e *capitan.Event) {
    log.Printf("[%s] %s: %s",
        e.Severity(),
        e.Signal().Name(),
        e.Signal().Description())
})
defer observer.Close()

Example (whitelist specific signals):

observer := capitan.Observe(handler, orderCreated, orderShipped, orderCanceled)

func (*Observer) Close

func (o *Observer) Close()

Close removes all individual listeners from the registry.

func (*Observer) Drain

func (o *Observer) Drain(ctx context.Context) error

Drain blocks until all events queued before Drain was called have been processed. Unlike Close, the observer remains active after draining. Returns an error if the context is canceled before drain completes.

type Option

type Option func(*Capitan)

Option configures a Capitan instance.

func WithBufferSize

func WithBufferSize(size int) Option

WithBufferSize sets the event queue buffer size for each signal's worker. Default is 16. Larger buffers reduce backpressure but increase memory usage.

func WithPanicHandler

func WithPanicHandler(handler PanicHandler) Option

WithPanicHandler sets a callback to be invoked when a listener panics. The handler receives the signal and the recovered panic value. By default, panics are recovered silently to prevent system crashes.

func WithSyncMode

func WithSyncMode() Option

WithSyncMode enables synchronous event processing for testing. When enabled, Emit() calls listeners directly instead of queueing to workers. This eliminates timing dependencies and makes tests deterministic. Should only be used in tests, not production code.

type PanicHandler

type PanicHandler func(signal Signal, recovered any)

PanicHandler is called when a listener panics during event processing. Receives the signal being processed and the recovered panic value.

type Severity

type Severity string

Severity represents the logging severity level of an event. Compatible with standard logging conventions and OpenTelemetry.

const (
	// SeverityDebug is for development and troubleshooting information.
	SeverityDebug Severity = "DEBUG"
	// SeverityInfo is for normal operational messages (default for Emit).
	SeverityInfo Severity = "INFO"
	// SeverityWarn is for warning conditions that may need attention.
	SeverityWarn Severity = "WARN"
	// SeverityError is for error conditions requiring immediate action.
	SeverityError Severity = "ERROR"
)

Severity levels for events.

type Signal

type Signal struct {
	// contains filtered or unexported fields
}

Signal represents an event type identifier used for routing events to listeners.

func NewSignal

func NewSignal(name, description string) Signal

NewSignal creates a new Signal with the given name and description. The description is used as the human-readable message when converting to logs.

func (Signal) Description

func (s Signal) Description() string

Description returns the signal's human-readable description.

func (Signal) Name

func (s Signal) Name() string

Name returns the signal's identifier.

type SignalConfig

type SignalConfig struct {
	// BufferSize sets the event queue size for this signal.
	// Default: instance-level bufferSize (typically 16).
	BufferSize int `json:"bufferSize,omitempty"`

	// Disabled suppresses all events for this signal when true.
	Disabled bool `json:"disabled,omitempty"`

	// MinSeverity filters events below this severity level.
	// Events with severity < MinSeverity are silently dropped.
	MinSeverity Severity `json:"minSeverity,omitempty"`

	// MaxListeners caps the number of listeners for this signal.
	// Hook() returns nil when the limit is reached. 0 = unlimited.
	MaxListeners int `json:"maxListeners,omitempty"`

	// DropPolicy controls behavior when the event buffer is full.
	// Default: DropPolicyBlock (wait for space).
	DropPolicy DropPolicy `json:"dropPolicy,omitempty"`

	// RateLimit sets the maximum events per second for this signal.
	// Events exceeding the rate are silently dropped. 0 = unlimited.
	RateLimit float64 `json:"rateLimit,omitempty"`

	// BurstSize sets the burst allowance above the rate limit.
	// Allows short bursts before rate limiting kicks in.
	BurstSize int `json:"burstSize,omitempty"`
}

SignalConfig holds per-signal configuration options. Zero values indicate "use default" for all fields.

type Stats

type Stats struct {
	// ActiveWorkers is the number of worker goroutines currently running.
	ActiveWorkers int

	// SignalCount is the total number of unique signals that have been registered.
	SignalCount int

	// DroppedEvents is the total number of events dropped due to no listeners.
	DroppedEvents uint64

	// QueueDepths maps each signal to the number of events queued in its buffer.
	QueueDepths map[Signal]int

	// ListenerCounts maps each signal to the number of registered listeners.
	ListenerCounts map[Signal]int

	// EmitCounts maps each signal to the total number of times it has been emitted.
	EmitCounts map[Signal]uint64

	// FieldSchemas maps each signal to the keys of fields from its first emission.
	// This provides a schema of what fields are available on each signal.
	FieldSchemas map[Signal][]Key
}

Stats provides runtime metrics for a Capitan instance.

type StringKey

type StringKey = GenericKey[string]

StringKey is a Key implementation for string values.

func NewStringKey

func NewStringKey(name string) StringKey

NewStringKey creates a StringKey with the given name.

type TimeKey

type TimeKey = GenericKey[time.Time]

TimeKey is a Key implementation for time.Time values.

func NewTimeKey

func NewTimeKey(name string) TimeKey

NewTimeKey creates a TimeKey with the given name.

type Uint32Key

type Uint32Key = GenericKey[uint32]

Uint32Key is a Key implementation for uint32 values.

func NewUint32Key

func NewUint32Key(name string) Uint32Key

NewUint32Key creates a Uint32Key with the given name.

type Uint64Key

type Uint64Key = GenericKey[uint64]

Uint64Key is a Key implementation for uint64 values.

func NewUint64Key

func NewUint64Key(name string) Uint64Key

NewUint64Key creates a Uint64Key with the given name.

type UintKey

type UintKey = GenericKey[uint]

UintKey is a Key implementation for uint values.

func NewUintKey

func NewUintKey(name string) UintKey

NewUintKey creates a UintKey with the given name.

type Variant

type Variant string

Variant is a discriminator for the Field interface implementation type. Used for runtime type identification when type assertions are needed.

const (
	VariantString   Variant = "string"
	VariantInt      Variant = "int"
	VariantInt32    Variant = "int32"
	VariantInt64    Variant = "int64"
	VariantUint     Variant = "uint"
	VariantUint32   Variant = "uint32"
	VariantUint64   Variant = "uint64"
	VariantFloat32  Variant = "float32"
	VariantFloat64  Variant = "float64"
	VariantBool     Variant = "bool"
	VariantTime     Variant = "time.Time"
	VariantDuration Variant = "time.Duration"
	VariantBytes    Variant = "[]byte"
	VariantError    Variant = "error"
)

Built-in variants for primitive types. Custom types should use namespaced variant strings (e.g., "myapp.OrderInfo") to avoid collisions.

Directories

Path Synopsis
Package testing provides test utilities and helpers for capitan users.
Package testing provides test utilities and helpers for capitan users.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL