Documentation
¶
Overview ¶
Package messenger implements a file-based pub-sub system.
Package servicename provides context helpers for stamping service names on messages.
Service names identify which service published a message. They are useful for debugging and log triage, allowing operators to quickly identify the source of a message even when multiple services publish to the same channel on the same instance.
Set a service name in context before publishing, and it will be automatically stamped on the envelope and delivered to subscribers:
ctx := messenger.WithServiceName(context.Background(), "payment-processor") messenger.Publish(ctx, "payments", "event.type", payload)
Subscribers receive the service name in the Message:
messenger.Subscribe(ctx, "payments", "sub-id", func(ctx context.Context, msg Message) error {
slog.Info("message", "service", msg.ServiceName)
return nil
})
Service names are preserved across hub forwarding, making them available for tracing in distributed message flows.
Index ¶
- Variables
- func CorrelationIDFromContext(ctx context.Context) string
- func EnsureDirectories(cfg *Config) error
- func ServiceNameFromContext(ctx context.Context) string
- func ValidateChannelName(name string) error
- func WithCorrelationID(ctx context.Context, id string) context.Context
- func WithServiceName(ctx context.Context, name string) context.Context
- type AllowedPeer
- type AuditConfig
- type ClientConfig
- type ClientHubRef
- type Config
- type DedupConfig
- type Duration
- type EphemeralConfig
- type EphemeralMessenger
- func (m *EphemeralMessenger) Close() error
- func (m *EphemeralMessenger) Connect(ctx context.Context) error
- func (m *EphemeralMessenger) Publish(ctx context.Context, channel, payloadType string, payload any) error
- func (m *EphemeralMessenger) RegisterPayloadType(typeStr string, prototype any) error
- func (m *EphemeralMessenger) Subscribe(channel string, handler func(msg Message)) error
- type FederationConfig
- type HandlerFunc
- type HubConfig
- type Logger
- type Message
- type Messenger
- func (m *Messenger) Close() error
- func (m *Messenger) HubAddr() string
- func (m *Messenger) InstanceName() string
- func (m *Messenger) Publish(ctx context.Context, channel, payloadType string, payload any) error
- func (m *Messenger) RegisterPayloadType(typeStr string, prototype any) error
- func (m *Messenger) Subscribe(ctx context.Context, channel, subscriberID string, handler HandlerFunc) error
- func (m *Messenger) Unsubscribe(channel, subscriberID string) error
- type Option
- type StorageConfig
- type SubscribersConfig
- type SyncPolicy
- type TLSConfig
Constants ¶
This section is empty.
Variables ¶
var ( // ErrPayloadTypeAlreadyRegistered is returned by RegisterPayloadType when the // same type string is registered more than once. ErrPayloadTypeAlreadyRegistered = errors.New("payload type already registered") // ErrInvalidChannelName is returned when a channel name is empty, exceeds // 255 bytes, or contains characters outside [a-zA-Z0-9._-]. ErrInvalidChannelName = errors.New("invalid channel name") // ErrMessengerClosed is returned when an operation is attempted on a // Messenger after Close has been called. ErrMessengerClosed = errors.New("messenger is closed") )
var ErrEphemeralConnLost = federation.ErrEphemeralConnLost
ErrEphemeralConnLost is returned by EphemeralMessenger.Publish when the connection drops before the hub has acknowledged the message.
Functions ¶
func CorrelationIDFromContext ¶ added in v0.9.0
CorrelationIDFromContext returns the correlation ID stored in ctx, or "" if none.
func EnsureDirectories ¶
EnsureDirectories creates the directory layout required by a Messenger under cfg.Storage.DataDir. It is called automatically by New; callers rarely need it directly.
func ServiceNameFromContext ¶ added in v0.10.0
ServiceNameFromContext retrieves the service name from the context. Returns an empty string if the service name was not set.
func ValidateChannelName ¶
ValidateChannelName returns a wrapped ErrInvalidChannelName if name is empty, exceeds 255 bytes, or contains characters outside [a-zA-Z0-9._-].
func WithCorrelationID ¶ added in v0.9.0
WithCorrelationID returns a context carrying the given correlation ID. Pass the returned context to Publish to stamp all resulting messages with this ID.
Types ¶
type AllowedPeer ¶ added in v0.6.2
type AllowedPeer struct {
// Name is the instance name embedded in the peer's TLS certificate CN.
Name string `yaml:"name"`
// Subscribe lists the channels this peer is permitted to receive (from hub to peer).
// An empty list means the peer may subscribe to any channel.
Subscribe []string `yaml:"subscribe"`
// Publish lists the channels this peer is permitted to send to this hub (from peer to hub).
// An empty list means the peer may publish to any channel.
Publish []string `yaml:"publish"`
}
AllowedPeer is an instance that is permitted to connect to this hub.
type AuditConfig ¶
type AuditConfig struct {
// MaxSizeMB rotates audit.jsonl when it reaches this size. Default: 100.
MaxSizeMB int `yaml:"max_size_mb"`
// MaxFiles is the number of rotated audit files to retain. Default: 10.
MaxFiles int `yaml:"max_files"`
}
AuditConfig controls audit log rotation.
type ClientConfig ¶
type ClientConfig struct {
// Enabled dials the listed hubs on startup when true. Default: false.
Enabled bool `yaml:"enabled"`
// Hubs is the list of hub addresses to connect to.
Hubs []ClientHubRef `yaml:"hubs"`
}
ClientConfig controls outbound hub connections.
type ClientHubRef ¶
type ClientHubRef struct {
// Addr is the host:port of the hub to dial.
Addr string `yaml:"addr"`
// Subscribe lists the channels to request from the hub (inbound to this instance).
// The hub may deliver a subset based on its access control policy.
Subscribe []string `yaml:"subscribe"`
// Publish lists the channels this instance will send to the hub (outbound from this instance).
// The hub may reject a subset based on its receive policy.
Publish []string `yaml:"publish"`
}
ClientHubRef is a hub address a client instance dials.
type Config ¶
type Config struct {
// Name is the human-readable identifier for this instance.
// Defaults to the OS hostname. Use "hostname:port" when multiple instances share a host.
Name string `yaml:"name"`
Storage StorageConfig `yaml:"storage"`
Subscribers SubscribersConfig `yaml:"subscribers"`
Hub HubConfig `yaml:"hub"`
Client ClientConfig `yaml:"client"`
TLS TLSConfig `yaml:"tls"`
Federation FederationConfig `yaml:"federation"`
Dedup DedupConfig `yaml:"dedup"`
Audit AuditConfig `yaml:"audit"`
}
Config is the top-level configuration for a Messenger instance. Load it from YAML with LoadConfig or construct it programmatically.
func LoadConfig ¶
LoadConfig reads a YAML config file, applies defaults, and validates the result.
func (*Config) ApplyDefaults ¶
func (c *Config) ApplyDefaults()
ApplyDefaults fills in zero-valued fields with their documented defaults. It is called automatically by LoadConfig. When constructing a Config programmatically, call ApplyDefaults before passing it to New.
type DedupConfig ¶
type DedupConfig struct {
// SeenIDLRUSize is the maximum number of message IDs held in the LRU dedup cache.
// Default: 100000.
SeenIDLRUSize int `yaml:"seen_id_lru_size"`
}
DedupConfig controls the in-memory seen-ID deduplication set.
type Duration ¶ added in v0.11.0
Duration is a time.Duration that unmarshals from a YAML string (e.g. "168h", "30m").
type EphemeralConfig ¶ added in v0.4.0
type EphemeralConfig struct {
// HubAddr is the host:port of the hub to connect to. Required.
HubAddr string
// InstanceName identifies this client in the hub's allowlist.
// Must match the CN of the TLS certificate. Required.
InstanceName string
// Subscribe lists the channels to receive messages from the hub.
// These are declared in the handshake; the hub may deliver a subset
// based on its access control policy. Fixed at connect time.
Subscribe []string
// TLS holds paths to the client's certificate, key, and CA.
// Leave zero-value to use plain-text ws:// (testing/trusted networks only).
TLS TLSConfig
// AutoReconnect enables automatic reconnection on disconnect.
// When true, ConnectWithReconnect is used; when false, Connect is used.
AutoReconnect bool
// ReconnectBase is the initial reconnect backoff. Default: 500ms.
ReconnectBase time.Duration
// ReconnectMax is the maximum reconnect backoff. Default: 60s.
ReconnectMax time.Duration
// ReconnectJitter is the fractional backoff jitter (0–1). Default: 0.2.
ReconnectJitter float64
// MaxBatchBytes is the maximum WebSocket frame payload in bytes. Default: 65536.
MaxBatchBytes int
// WriteQueueSize is the outbound buffer depth. Default: 256.
WriteQueueSize int
}
EphemeralConfig configures an EphemeralMessenger.
type EphemeralMessenger ¶ added in v0.4.0
type EphemeralMessenger struct {
// contains filtered or unexported fields
}
EphemeralMessenger connects to a hub without maintaining local state.
Publish blocks until the hub acknowledges each message. If the connection drops before the ack arrives, Publish returns ErrEphemeralConnLost and the message may or may not have been received by the hub.
Subscribe registers in-memory handlers that receive messages only while connected. On reconnect, delivery resumes from the current hub position; messages published while disconnected are never replayed.
Construct with NewEphemeralMessenger; call Subscribe before Connect.
func NewEphemeralMessenger ¶ added in v0.4.0
func NewEphemeralMessenger(cfg EphemeralConfig, opts ...Option) (*EphemeralMessenger, error)
NewEphemeralMessenger constructs an EphemeralMessenger. Call Subscribe to register inbound handlers, then Connect to dial the hub.
func (*EphemeralMessenger) Close ¶ added in v0.4.0
func (m *EphemeralMessenger) Close() error
Close disconnects from the hub and stops all background goroutines. Pending Publish calls receive ErrEphemeralClosed. Safe to call more than once.
func (*EphemeralMessenger) Connect ¶ added in v0.4.0
func (m *EphemeralMessenger) Connect(ctx context.Context) error
Connect dials the hub and starts the background goroutines.
With AutoReconnect true: returns after the first connection; subsequent reconnects happen transparently in the background until Close is called.
With AutoReconnect false: returns after the initial connection. On disconnect, subsequent Publish calls return ErrEphemeralConnLost until Connect is called again.
func (*EphemeralMessenger) Publish ¶ added in v0.4.0
func (m *EphemeralMessenger) Publish(ctx context.Context, channel, payloadType string, payload any) error
Publish creates an envelope and blocks until the hub acknowledges it. Returns nil on success, ErrEphemeralConnLost if the connection drops before the ack, or a wrapped ctx.Err() if the context is cancelled.
func (*EphemeralMessenger) RegisterPayloadType ¶ added in v0.4.0
func (m *EphemeralMessenger) RegisterPayloadType(typeStr string, prototype any) error
RegisterPayloadType associates typeStr with the Go type of prototype for decoding inbound message payloads delivered to Subscribe handlers. Registering the same typeStr twice returns ErrPayloadTypeAlreadyRegistered.
func (*EphemeralMessenger) Subscribe ¶ added in v0.4.0
func (m *EphemeralMessenger) Subscribe(channel string, handler func(msg Message)) error
Subscribe registers handler for inbound messages on channel. Should be called before Connect so the channel is included in the handshake. Handler is called synchronously within the receive goroutine; it must not block.
type FederationConfig ¶
type FederationConfig struct {
// ReconnectBaseMS is the initial reconnection backoff delay in milliseconds. Default: 500.
ReconnectBaseMS int `yaml:"reconnect_base_ms"`
// ReconnectMaxMS is the maximum reconnection backoff delay in milliseconds. Default: 60000.
ReconnectMaxMS int `yaml:"reconnect_max_ms"`
// ReconnectJitter is the fractional jitter applied to the backoff delay (0–1). Default: 0.2.
ReconnectJitter float64 `yaml:"reconnect_jitter"`
// SendBufferMessages is the maximum number of unacknowledged outbound messages buffered
// per peer hub during a disconnection. Messages beyond this limit are dropped with a
// warning. Default: 10000.
SendBufferMessages int `yaml:"send_buffer_messages"`
// MaxBatchBytes is the maximum size of a single WebSocket frame payload in bytes.
// Default: 65536.
MaxBatchBytes int `yaml:"max_batch_bytes"`
}
FederationConfig controls WebSocket reconnection and message batching.
type HandlerFunc ¶
HandlerFunc processes a decoded Message. A non-nil return or a panic triggers the retry and dead-letter logic configured via subscribers.max_retries.
type HubConfig ¶
type HubConfig struct {
// Enabled starts a WebSocket listener on ListenAddr when true. Default: false.
Enabled bool `yaml:"enabled"`
// ListenAddr is the address to listen on, e.g. "0.0.0.0:7740".
// Required when Enabled is true.
ListenAddr string `yaml:"listen_addr"`
// AllowedPeers is the explicit list of peer instance names permitted to connect.
// Connections from instances not in this list are rejected after the mTLS handshake.
AllowedPeers []AllowedPeer `yaml:"allowed_peers"`
// FedClientOffsetTTL is how long a disconnected federation client's offset files
// are retained before the hub deletes them. Offset files block compaction, so
// stale files from clients that never reconnect must be cleaned up.
// Default: 168h (1 week). Set to 0 to disable the TTL sweep entirely.
FedClientOffsetTTL Duration `yaml:"fed_client_offset_ttl"`
}
HubConfig controls the hub listener and its federation policy.
type Logger ¶
type Logger interface {
Debug(msg string, args ...any)
Info(msg string, args ...any)
Warn(msg string, args ...any)
Error(msg string, args ...any)
}
Logger is the structured logging interface callers inject into the Messenger. Its method set is compatible with log/slog.Logger for drop-in use:
m, _ := messenger.New(cfg, messenger.WithLogger(slog.Default()))
type Message ¶
type Message struct {
// ID is the UUID v4 that uniquely identifies this message.
ID string
// Channel is the channel the message was published to.
Channel string
// Origin is the instance name of the original publisher, preserved across
// hub forwarding.
Origin string
// PayloadType is the type discriminator string (e.g. "com.acme.OrderCreated").
PayloadType string
// CorrelationID is an optional application-level identifier used to group
// related messages across a multi-step process.
CorrelationID string
// ServiceName is the name of the service that published this message.
ServiceName string
// Payload is the decoded payload. Its concrete type is the prototype
// registered via RegisterPayloadType, or map[string]any for unknown types.
Payload any
// Timestamp is the UTC time the message was published.
Timestamp time.Time
}
Message is the decoded representation of a stored envelope delivered to a subscriber handler.
type Messenger ¶
type Messenger struct {
// contains filtered or unexported fields
}
Messenger is the top-level pub-sub instance. Construct with New.
func New ¶
New constructs and starts a Messenger. It creates the required directory layout, initialises all internal components, and starts any configured hub listener and client connections.
func (*Messenger) Close ¶
Close gracefully stops all subscribers, peer connections, and internal goroutines. Safe to call more than once.
func (*Messenger) HubAddr ¶
HubAddr returns the network address the hub is listening on, or empty string if no hub is running. Useful in tests to discover the dynamically-assigned port.
func (*Messenger) InstanceName ¶ added in v0.5.0
InstanceName returns the configured instance name for this messenger.
func (*Messenger) Publish ¶
Publish creates an envelope for payload, writes it to channel's storage file, notifies local subscribers, and enqueues it for any configured peer senders. Publish blocks until the write is confirmed (per sync_policy).
func (*Messenger) RegisterPayloadType ¶
RegisterPayloadType associates typeStr with the Go type of prototype for decoding incoming message payloads. Registering the same typeStr twice returns ErrPayloadTypeAlreadyRegistered.
func (*Messenger) Subscribe ¶
func (m *Messenger) Subscribe(ctx context.Context, channel, subscriberID string, handler HandlerFunc) error
Subscribe registers handler for all new messages on channel. Delivery is at-least-once: the handler may be called again for the same message after a restart. The goroutine runs until ctx is cancelled or Unsubscribe is called.
func (*Messenger) Unsubscribe ¶
Unsubscribe stops the subscriber goroutine for (channel, subscriberID) and removes the offset file so the subscriber position is forgotten.
type Option ¶
type Option func(*messengerOptions)
Option is a functional option for New.
func WithConfig ¶
WithConfig supplies a Config via option rather than as the positional argument to New. When both are provided the positional argument takes precedence.
func WithDataDir ¶
WithDataDir overrides Config.Storage.DataDir. Useful in tests or when the data directory is determined at runtime rather than in the config file.
func WithLogger ¶
WithLogger injects a structured logger. If not provided, log output is discarded. The Logger interface is compatible with log/slog.Logger:
messenger.New(cfg, messenger.WithLogger(slog.Default()))
type StorageConfig ¶
type StorageConfig struct {
// DataDir is the root directory for channel files, subscriber offsets, and the audit log.
// Required.
DataDir string `yaml:"data_dir"`
// SyncPolicy controls when writes are flushed to stable storage.
// Default: "periodic".
SyncPolicy SyncPolicy `yaml:"sync_policy"`
// SyncIntervalMS is the fsync interval in milliseconds when SyncPolicy is "periodic".
// Default: 200.
SyncIntervalMS int `yaml:"sync_interval_ms"`
// MaxSubscriberLagMB triggers a warning log when a subscriber's unread backlog
// exceeds this many megabytes. Default: 512.
MaxSubscriberLagMB int `yaml:"max_subscriber_lag_mb"`
// CompactionThresholdMB triggers file rotation when the consumed (already-read-by-all-
// subscribers) portion of a channel file exceeds this size. Default: 256.
CompactionThresholdMB int `yaml:"compaction_threshold_mb"`
// OffsetFlushIntervalMS is the minimum time between subscriber offset file
// flushes (fsync + atomic rename). 0 flushes after every delivered message,
// which is the strictest at-least-once guarantee but slowest on high-latency
// storage. Values > 0 batch flushes for higher throughput; on a crash the
// subscriber may replay up to this many milliseconds of already-delivered
// messages. Default: 0 (flush every message).
OffsetFlushIntervalMS int `yaml:"offset_flush_interval_ms"`
}
StorageConfig controls the on-disk message store.
type SubscribersConfig ¶
type SubscribersConfig struct {
// MaxRetries is the number of times a failing handler is retried before the message
// is routed to the dead-letter channel. Default: 5.
//
// Use a pointer so that an explicit value of 0 (fail immediately to the dead-letter
// channel) is distinguishable from "not set" when loading from YAML. A nil value
// is replaced with the default of 5 by [Config.ApplyDefaults].
MaxRetries *int `yaml:"max_retries"`
}
SubscribersConfig controls subscriber delivery behaviour.
type SyncPolicy ¶
type SyncPolicy string
SyncPolicy controls when channel file writes are flushed to stable storage.
const ( // SyncPolicyNone lets the OS flush at its own discretion. // Publish() returns after write() succeeds. Fastest; data in OS page cache only. SyncPolicyNone SyncPolicy = "none" // SyncPolicyPeriodic flushes via a background fsync ticker. // Publish() returns after write() succeeds; fsync runs on a separate timer. // Durable against application crashes; not against OS crashes between intervals. SyncPolicyPeriodic SyncPolicy = "periodic" // SyncPolicyAlways fsyncs after every write before Publish() returns. // Slowest; durable against OS crashes and power failure. SyncPolicyAlways SyncPolicy = "always" )
type TLSConfig ¶
type TLSConfig struct {
// Cert is the path to the instance's PEM-encoded certificate.
Cert string `yaml:"cert"`
// Key is the path to the instance's PEM-encoded private key.
Key string `yaml:"key"`
// CA is the path to the PEM-encoded CA certificate used to verify peers.
CA string `yaml:"ca"`
// MinVersion is the minimum TLS version. Default: "1.3".
MinVersion string `yaml:"min_version"`
// ExpiryWarnDays triggers a warning log when the instance cert or CA cert expires
// within this many days. Default: 30.
ExpiryWarnDays int `yaml:"expiry_warn_days"`
}
TLSConfig holds paths to the instance's TLS credentials.
Source Files
¶
Directories
¶
| Path | Synopsis |
|---|---|
|
cmd
|
|
|
keyop-messenger
command
Command keyop-messenger provides the CLI for keyop-messenger key management.
|
Command keyop-messenger provides the CLI for keyop-messenger key management. |
|
internal
|
|
|
audit
Package audit provides a structured audit writer for cross-hub forwarding events.
|
Package audit provides a structured audit writer for cross-hub forwarding events. |
|
dedup
Package dedup provides deduplication of envelope IDs.
|
Package dedup provides deduplication of envelope IDs. |
|
envelope
Package envelope defines the message envelope written to every channel file.
|
Package envelope defines the message envelope written to every channel file. |
|
federation
Package federation implements the hub-to-hub and hub-to-client wire protocol.
|
Package federation implements the hub-to-hub and hub-to-client wire protocol. |
|
registry
Package registry maps payload type discriminator strings to Go types for encoding and decoding message payloads.
|
Package registry maps payload type discriminator strings to Go types for encoding and decoding message payloads. |
|
storage
Package storage implements the per-channel file writer and related storage primitives.
|
Package storage implements the per-channel file writer and related storage primitives. |
|
testutil
Package testutil provides shared test helpers used across internal packages.
|
Package testutil provides shared test helpers used across internal packages. |
|
tlsutil
Package tlsutil provides TLS configuration helpers, certificate generation, and hot-reload support for the keyop-messenger federation layer.
|
Package tlsutil provides TLS configuration helpers, certificate generation, and hot-reload support for the keyop-messenger federation layer. |