messenger

package module
v0.11.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Apr 15, 2026 License: BSD-2-Clause Imports: 19 Imported by: 0

README

Keyop Messenger

Go Reference

Keyop Messenger is a high-reliability, file-based pub-sub library for Go. It is designed for systems where durability and delivery guarantees are paramount, offering a simple yet robust architecture based on append-only .jsonl files, persistent offset tracking, and mTLS-secured federation.

NOTE: This is still Beta, the API should now be relatively stable.

See also: DESIGN.md for detailed design rationale and architecture.

Key Features

  • At-Least-Once Delivery: Messages are only committed (offset advanced) after successful handler execution.
  • Durable Storage: Every channel is a directory of fixed-size segment files. Atomic appends ensure no record interleaving. Old segments are deleted once all subscribers have consumed them.
  • Persistent Offset Tracking: Subscribers resume exactly where they left off, even after a crash or restart.
  • Low-Latency Dispatch: Uses a dual-layer notification system (in-process LocalNotifier + fsnotify for filesystem events).
  • Type-Safe Payloads: Built-in registry for decoding message bodies into structured Go types.
  • Correlation IDs: Stamp messages with application-level correlation IDs to trace multi-step processes across service boundaries.
  • Reliable Retries & DLQ: Configurable retry logic with automatic routing to .dead-letter channels.
  • Secure Federation: Star-topology federation over mTLS WebSockets. Clients subscribe to specific channels; hubs enforce per-client channel allowlists under explicit policy.
  • Observability: File-based offsets and JSONL records allow operators to use standard Unix tools (cat, grep, tail) for debugging.

Why Keyop Messenger?

Unlike memory-based message brokers, Keyop Messenger treats the filesystem as the single source of truth. This makes it:

  1. Resilient: No complex cluster state to manage. If the file is there, the data is safe.
  2. Transparent: Debugging a stuck subscriber is as simple as cat subscriber.offset.
  3. Low-Overhead: No separate broker process is required for local-only messaging.

Quick Start

Installation
go get github.com/wu/keyop-messenger
Basic Usage
package main

import (
    "context"
    "log/slog"

    messenger "github.com/wu/keyop-messenger"
)

type Alert struct {
    Message string `json:"message"`
}

func main() {
    cfg := &messenger.Config{
        Name: "my-instance",
        Storage: messenger.StorageConfig{
            DataDir: "/var/keyop/my-instance",
        },
    }
    cfg.ApplyDefaults()

    m, err := messenger.New(cfg, messenger.WithLogger(slog.Default()))
    if err != nil {
        panic(err)
    }
    defer m.Close()

    // Register payload types for typed decoding.
    m.RegisterPayloadType("com.example.Alert", Alert{})

    ctx := context.Background()

    // Subscribe before publishing so the handler sees the message.
    m.Subscribe(ctx, "alerts", "worker-1", func(ctx context.Context, msg messenger.Message) error {
        a := msg.Payload.(Alert)
        slog.Info("received",
            "message", a.Message,
            "origin", msg.Origin,
            "service", msg.ServiceName,
        )
        return nil
    })

    // Publish with service identification. Blocks until the write is confirmed to disk.
    pubCtx := messenger.WithServiceName(ctx, "monitor-service")
    m.Publish(pubCtx, "alerts", "com.example.Alert", Alert{Message: "system heat!"})
}
Correlation IDs

Correlation IDs track related messages across multi-step processes. Set a correlation ID via context before publishing, and it will be stamped on the envelope and delivered to subscribers. Useful for tracing a request through multiple services.

// Start a correlated chain of messages
ctx := messenger.WithCorrelationID(context.Background(), "order-123")

// All messages published with this context carry the same correlation ID
m.Publish(ctx, "orders", "com.example.OrderCreated", &order)
m.Publish(ctx, "payments", "com.example.ChargeOrder", &charge)
m.Publish(ctx, "shipping", "com.example.ShipOrder", &shipment)

// Subscribers receive the correlation ID
m.Subscribe(ctx, "orders", "processor", func(ctx context.Context, msg messenger.Message) error {
    // msg.CorrelationID == "order-123"

    // Propagate to downstream services
    downstreamCtx := messenger.WithCorrelationID(ctx, msg.CorrelationID)
    m.Publish(downstreamCtx, "next-channel", "com.example.NextEvent", &event)

    return nil
})
Service Names

Service names identify which service published a message. Set a service name via context before publishing, and it will be stamped on the envelope and delivered to subscribers. Useful for debugging and log triage.

// Publish from a specific service
ctx := messenger.WithServiceName(context.Background(), "payment-processor")
m.Publish(ctx, "payments", "com.example.ChargeCompleted", &charge)

// Subscribers can see which service published the message
m.Subscribe(ctx, "payments", "auditor", func(ctx context.Context, msg messenger.Message) error {
    slog.Info("payment processed",
        "service", msg.ServiceName,  // "payment-processor"
        "origin", msg.Origin,        // instance name
        "id", msg.ID,
    )
    return nil
})

Service names work well with correlation IDs — both can be set in the same context:

ctx := messenger.WithServiceName(context.Background(), "orders")
ctx = messenger.WithCorrelationID(ctx, "order-789")

m.Publish(ctx, "orders", "com.example.OrderCreated", &order)
Certificate Generation (for Federation)
# Install the CLI
go install github.com/wu/keyop-messenger/cmd/keyop-messenger@latest

# Generate a CA (once per cluster)
keyop-messenger keygen ca --out-cert ca.crt --out-key ca.key

# Generate a per-instance certificate
keyop-messenger keygen instance \
  --ca ca.crt --ca-key ca.key \
  --name billing-host \
  --out-cert billing-host.crt \
  --out-key  billing-host.key

Ephemeral Client

EphemeralMessenger connects to a hub without maintaining any local storage. Use it when:

  • You need to publish with delivery confirmation (blocks until the hub has written the message to disk) but do not want to manage a data directory.
  • You want to receive live messages only while connected, with no replay of messages published during a disconnect.
Publishing
em, err := messenger.NewEphemeralMessenger(messenger.EphemeralConfig{
    HubAddr:      "hub.example.com:7740",
    InstanceName: "transient-service",
    TLS: messenger.TLSConfig{
        Cert: "transient-service.crt",
        Key:  "transient-service.key",
        CA:   "ca.crt",
    },
})
if err != nil {
    panic(err)
}
defer em.Close()

ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
defer cancel()

if err := em.Connect(ctx); err != nil {
    panic(err)
}

// Publish blocks until the hub acks (message is on disk) or ctx expires.
err = em.Publish(ctx, "channelname", "com.example.Event", order)
if errors.Is(err, messenger.ErrEphemeralConnLost) {
    // Connection dropped before ack — message may or may not have been received.
    // Retry if idempotent; check or alert otherwise.
}
Subscribing
em, err := messenger.NewEphemeralMessenger(messenger.EphemeralConfig{
    HubAddr:      "hub.example.com:7740",
    InstanceName: "dashboard",
    Subscribe:    []string{"metrics"},   // declare channels before Connect
    TLS:          messenger.TLSConfig{Cert: "dashboard.crt", Key: "dashboard.key", CA: "ca.crt"},
    AutoReconnect: true,
})

em.Subscribe("metrics", func(msg messenger.Message) {
    fmt.Println("live metric:", msg.Payload)
})

em.Connect(ctx) // returns after first connection; reconnects in background

Handler errors are logged but do not stop delivery. On reconnect, delivery resumes from the current hub position — messages published while disconnected are never replayed.

Auto-Reconnect

Set AutoReconnect: true to reconnect automatically with exponential backoff after a disconnect. The default backoff starts at 500 ms and caps at 60 s with ±20% jitter. Pending Publish calls that have not yet been enqueued block until reconnected; in-flight calls at the moment of disconnect return ErrEphemeralConnLost.

Differences from Messenger
Messenger EphemeralMessenger
Local storage .jsonl files per channel None
Subscribe replay Resumes from last offset on restart No replay; live-only
Publish ack Write confirmed to local disk Hub confirmed to disk; connection loss = ErrEphemeralConnLost
Data directory Required Not required

Architecture

Keyop Messenger follows a Hub-and-Spoke model:

  • Clients: Connect to a Hub to publish or subscribe to channels.
  • Hubs: Manage local .jsonl files, coordinate with peer Hubs, and deliver messages to subscribers using a file-reader pull model.
  • Channels: Each channel is a directory of append-only .jsonl segment files. Once all subscribers (local and federation) have consumed a segment it is deleted — no copying, no writer pauses.
  • Offsets: Each subscriber has a unique .offset file tracking its last read byte position across all segments. Federation peer offsets are stored under subscribers/{channel}/fed-{peerName}.offset and are automatically included in compaction boundary calculations.
Federation Delivery Model

The hub delivers messages to subscribed federation peers using the same mechanism as local subscribers:

  1. When a message is written to a channel, the hub calls NotifyChannel(channel), waking a channelReader goroutine for each peer subscribed to that channel.
  2. The channelReader reads from segment files starting at the peer's last byte offset, batches envelopes (up to max_batch_bytes), and delivers them via WebSocket.
  3. After the peer acknowledges the batch, the byte offset is persisted atomically to subscribers/{channel}/fed-{peerName}.offset.
  4. On reconnect, delivery resumes from the stored offset — no last_id handshake field is needed.

Offset files for peers that disconnect and never reconnect are cleaned up by a TTL sweep (configurable via hub.fed_client_offset_ttl, default 1 week).

Development Commands

make test               # unit tests with race detector
make test-integration   # integration tests (build tag: integration)
make bench              # benchmarks
make lint               # golangci-lint
make build              # verify compilation

Or directly:

go test -race ./...
go test -race -tags integration -timeout 60s ./...
go test -run='^$' -bench=. -benchmem -benchtime=3s ./...
golangci-lint run ./...

License

BSD 2-Clause License. See LICENSE for details.

Documentation

Overview

Package messenger implements a file-based pub-sub system.

Package servicename provides context helpers for stamping service names on messages.

Service names identify which service published a message. They are useful for debugging and log triage, allowing operators to quickly identify the source of a message even when multiple services publish to the same channel on the same instance.

Set a service name in context before publishing, and it will be automatically stamped on the envelope and delivered to subscribers:

ctx := messenger.WithServiceName(context.Background(), "payment-processor")
messenger.Publish(ctx, "payments", "event.type", payload)

Subscribers receive the service name in the Message:

messenger.Subscribe(ctx, "payments", "sub-id", func(ctx context.Context, msg Message) error {
    slog.Info("message", "service", msg.ServiceName)
    return nil
})

Service names are preserved across hub forwarding, making them available for tracing in distributed message flows.

Index

Constants

This section is empty.

Variables

View Source
var (
	// ErrPayloadTypeAlreadyRegistered is returned by RegisterPayloadType when the
	// same type string is registered more than once.
	ErrPayloadTypeAlreadyRegistered = errors.New("payload type already registered")

	// ErrInvalidChannelName is returned when a channel name is empty, exceeds
	// 255 bytes, or contains characters outside [a-zA-Z0-9._-].
	ErrInvalidChannelName = errors.New("invalid channel name")

	// ErrMessengerClosed is returned when an operation is attempted on a
	// Messenger after Close has been called.
	ErrMessengerClosed = errors.New("messenger is closed")
)
View Source
var ErrEphemeralConnLost = federation.ErrEphemeralConnLost

ErrEphemeralConnLost is returned by EphemeralMessenger.Publish when the connection drops before the hub has acknowledged the message.

Functions

func CorrelationIDFromContext added in v0.9.0

func CorrelationIDFromContext(ctx context.Context) string

CorrelationIDFromContext returns the correlation ID stored in ctx, or "" if none.

func EnsureDirectories

func EnsureDirectories(cfg *Config) error

EnsureDirectories creates the directory layout required by a Messenger under cfg.Storage.DataDir. It is called automatically by New; callers rarely need it directly.

func ServiceNameFromContext added in v0.10.0

func ServiceNameFromContext(ctx context.Context) string

ServiceNameFromContext retrieves the service name from the context. Returns an empty string if the service name was not set.

func ValidateChannelName

func ValidateChannelName(name string) error

ValidateChannelName returns a wrapped ErrInvalidChannelName if name is empty, exceeds 255 bytes, or contains characters outside [a-zA-Z0-9._-].

func WithCorrelationID added in v0.9.0

func WithCorrelationID(ctx context.Context, id string) context.Context

WithCorrelationID returns a context carrying the given correlation ID. Pass the returned context to Publish to stamp all resulting messages with this ID.

func WithServiceName added in v0.10.0

func WithServiceName(ctx context.Context, name string) context.Context

WithServiceName returns a new context with the service name set. The service name is typically the name of the service that is publishing messages.

Types

type AllowedPeer added in v0.6.2

type AllowedPeer struct {
	// Name is the instance name embedded in the peer's TLS certificate CN.
	Name string `yaml:"name"`
	// Subscribe lists the channels this peer is permitted to receive (from hub to peer).
	// An empty list means the peer may subscribe to any channel.
	Subscribe []string `yaml:"subscribe"`
	// Publish lists the channels this peer is permitted to send to this hub (from peer to hub).
	// An empty list means the peer may publish to any channel.
	Publish []string `yaml:"publish"`
}

AllowedPeer is an instance that is permitted to connect to this hub.

type AuditConfig

type AuditConfig struct {
	// MaxSizeMB rotates audit.jsonl when it reaches this size. Default: 100.
	MaxSizeMB int `yaml:"max_size_mb"`

	// MaxFiles is the number of rotated audit files to retain. Default: 10.
	MaxFiles int `yaml:"max_files"`
}

AuditConfig controls audit log rotation.

type ClientConfig

type ClientConfig struct {
	// Enabled dials the listed hubs on startup when true. Default: false.
	Enabled bool `yaml:"enabled"`

	// Hubs is the list of hub addresses to connect to.
	Hubs []ClientHubRef `yaml:"hubs"`
}

ClientConfig controls outbound hub connections.

type ClientHubRef

type ClientHubRef struct {
	// Addr is the host:port of the hub to dial.
	Addr string `yaml:"addr"`
	// Subscribe lists the channels to request from the hub (inbound to this instance).
	// The hub may deliver a subset based on its access control policy.
	Subscribe []string `yaml:"subscribe"`
	// Publish lists the channels this instance will send to the hub (outbound from this instance).
	// The hub may reject a subset based on its receive policy.
	Publish []string `yaml:"publish"`
}

ClientHubRef is a hub address a client instance dials.

type Config

type Config struct {
	// Name is the human-readable identifier for this instance.
	// Defaults to the OS hostname. Use "hostname:port" when multiple instances share a host.
	Name string `yaml:"name"`

	Storage     StorageConfig     `yaml:"storage"`
	Subscribers SubscribersConfig `yaml:"subscribers"`
	Hub         HubConfig         `yaml:"hub"`
	Client      ClientConfig      `yaml:"client"`
	TLS         TLSConfig         `yaml:"tls"`
	Federation  FederationConfig  `yaml:"federation"`
	Dedup       DedupConfig       `yaml:"dedup"`
	Audit       AuditConfig       `yaml:"audit"`
}

Config is the top-level configuration for a Messenger instance. Load it from YAML with LoadConfig or construct it programmatically.

func LoadConfig

func LoadConfig(path string) (*Config, error)

LoadConfig reads a YAML config file, applies defaults, and validates the result.

func (*Config) ApplyDefaults

func (c *Config) ApplyDefaults()

ApplyDefaults fills in zero-valued fields with their documented defaults. It is called automatically by LoadConfig. When constructing a Config programmatically, call ApplyDefaults before passing it to New.

func (*Config) Validate

func (c *Config) Validate() error

Validate returns an error describing all configuration problems found. It expects [ApplyDefaults] to have been called first.

type DedupConfig

type DedupConfig struct {
	// SeenIDLRUSize is the maximum number of message IDs held in the LRU dedup cache.
	// Default: 100000.
	SeenIDLRUSize int `yaml:"seen_id_lru_size"`
}

DedupConfig controls the in-memory seen-ID deduplication set.

type Duration added in v0.11.0

type Duration struct{ time.Duration }

Duration is a time.Duration that unmarshals from a YAML string (e.g. "168h", "30m").

func (*Duration) UnmarshalYAML added in v0.11.0

func (d *Duration) UnmarshalYAML(value *yaml.Node) error

UnmarshalYAML implements yaml.Unmarshaler so Duration fields accept Go duration strings.

type EphemeralConfig added in v0.4.0

type EphemeralConfig struct {
	// HubAddr is the host:port of the hub to connect to. Required.
	HubAddr string
	// InstanceName identifies this client in the hub's allowlist.
	// Must match the CN of the TLS certificate. Required.
	InstanceName string
	// Subscribe lists the channels to receive messages from the hub.
	// These are declared in the handshake; the hub may deliver a subset
	// based on its access control policy. Fixed at connect time.
	Subscribe []string
	// TLS holds paths to the client's certificate, key, and CA.
	// Leave zero-value to use plain-text ws:// (testing/trusted networks only).
	TLS TLSConfig
	// AutoReconnect enables automatic reconnection on disconnect.
	// When true, ConnectWithReconnect is used; when false, Connect is used.
	AutoReconnect bool
	// ReconnectBase is the initial reconnect backoff. Default: 500ms.
	ReconnectBase time.Duration
	// ReconnectMax is the maximum reconnect backoff. Default: 60s.
	ReconnectMax time.Duration
	// ReconnectJitter is the fractional backoff jitter (0–1). Default: 0.2.
	ReconnectJitter float64
	// MaxBatchBytes is the maximum WebSocket frame payload in bytes. Default: 65536.
	MaxBatchBytes int
	// WriteQueueSize is the outbound buffer depth. Default: 256.
	WriteQueueSize int
}

EphemeralConfig configures an EphemeralMessenger.

type EphemeralMessenger added in v0.4.0

type EphemeralMessenger struct {
	// contains filtered or unexported fields
}

EphemeralMessenger connects to a hub without maintaining local state.

Publish blocks until the hub acknowledges each message. If the connection drops before the ack arrives, Publish returns ErrEphemeralConnLost and the message may or may not have been received by the hub.

Subscribe registers in-memory handlers that receive messages only while connected. On reconnect, delivery resumes from the current hub position; messages published while disconnected are never replayed.

Construct with NewEphemeralMessenger; call Subscribe before Connect.

func NewEphemeralMessenger added in v0.4.0

func NewEphemeralMessenger(cfg EphemeralConfig, opts ...Option) (*EphemeralMessenger, error)

NewEphemeralMessenger constructs an EphemeralMessenger. Call Subscribe to register inbound handlers, then Connect to dial the hub.

func (*EphemeralMessenger) Close added in v0.4.0

func (m *EphemeralMessenger) Close() error

Close disconnects from the hub and stops all background goroutines. Pending Publish calls receive ErrEphemeralClosed. Safe to call more than once.

func (*EphemeralMessenger) Connect added in v0.4.0

func (m *EphemeralMessenger) Connect(ctx context.Context) error

Connect dials the hub and starts the background goroutines.

With AutoReconnect true: returns after the first connection; subsequent reconnects happen transparently in the background until Close is called.

With AutoReconnect false: returns after the initial connection. On disconnect, subsequent Publish calls return ErrEphemeralConnLost until Connect is called again.

func (*EphemeralMessenger) Publish added in v0.4.0

func (m *EphemeralMessenger) Publish(ctx context.Context, channel, payloadType string, payload any) error

Publish creates an envelope and blocks until the hub acknowledges it. Returns nil on success, ErrEphemeralConnLost if the connection drops before the ack, or a wrapped ctx.Err() if the context is cancelled.

func (*EphemeralMessenger) RegisterPayloadType added in v0.4.0

func (m *EphemeralMessenger) RegisterPayloadType(typeStr string, prototype any) error

RegisterPayloadType associates typeStr with the Go type of prototype for decoding inbound message payloads delivered to Subscribe handlers. Registering the same typeStr twice returns ErrPayloadTypeAlreadyRegistered.

func (*EphemeralMessenger) Subscribe added in v0.4.0

func (m *EphemeralMessenger) Subscribe(channel string, handler func(msg Message)) error

Subscribe registers handler for inbound messages on channel. Should be called before Connect so the channel is included in the handshake. Handler is called synchronously within the receive goroutine; it must not block.

type FederationConfig

type FederationConfig struct {
	// ReconnectBaseMS is the initial reconnection backoff delay in milliseconds. Default: 500.
	ReconnectBaseMS int `yaml:"reconnect_base_ms"`

	// ReconnectMaxMS is the maximum reconnection backoff delay in milliseconds. Default: 60000.
	ReconnectMaxMS int `yaml:"reconnect_max_ms"`

	// ReconnectJitter is the fractional jitter applied to the backoff delay (0–1). Default: 0.2.
	ReconnectJitter float64 `yaml:"reconnect_jitter"`

	// SendBufferMessages is the maximum number of unacknowledged outbound messages buffered
	// per peer hub during a disconnection. Messages beyond this limit are dropped with a
	// warning. Default: 10000.
	SendBufferMessages int `yaml:"send_buffer_messages"`

	// MaxBatchBytes is the maximum size of a single WebSocket frame payload in bytes.
	// Default: 65536.
	MaxBatchBytes int `yaml:"max_batch_bytes"`
}

FederationConfig controls WebSocket reconnection and message batching.

type HandlerFunc

type HandlerFunc func(ctx context.Context, msg Message) error

HandlerFunc processes a decoded Message. A non-nil return or a panic triggers the retry and dead-letter logic configured via subscribers.max_retries.

type HubConfig

type HubConfig struct {
	// Enabled starts a WebSocket listener on ListenAddr when true. Default: false.
	Enabled bool `yaml:"enabled"`

	// ListenAddr is the address to listen on, e.g. "0.0.0.0:7740".
	// Required when Enabled is true.
	ListenAddr string `yaml:"listen_addr"`

	// AllowedPeers is the explicit list of peer instance names permitted to connect.
	// Connections from instances not in this list are rejected after the mTLS handshake.
	AllowedPeers []AllowedPeer `yaml:"allowed_peers"`

	// FedClientOffsetTTL is how long a disconnected federation client's offset files
	// are retained before the hub deletes them. Offset files block compaction, so
	// stale files from clients that never reconnect must be cleaned up.
	// Default: 168h (1 week). Set to 0 to disable the TTL sweep entirely.
	FedClientOffsetTTL Duration `yaml:"fed_client_offset_ttl"`
}

HubConfig controls the hub listener and its federation policy.

type Logger

type Logger interface {
	Debug(msg string, args ...any)
	Info(msg string, args ...any)
	Warn(msg string, args ...any)
	Error(msg string, args ...any)
}

Logger is the structured logging interface callers inject into the Messenger. Its method set is compatible with log/slog.Logger for drop-in use:

m, _ := messenger.New(cfg, messenger.WithLogger(slog.Default()))

type Message

type Message struct {
	// ID is the UUID v4 that uniquely identifies this message.
	ID string
	// Channel is the channel the message was published to.
	Channel string
	// Origin is the instance name of the original publisher, preserved across
	// hub forwarding.
	Origin string
	// PayloadType is the type discriminator string (e.g. "com.acme.OrderCreated").
	PayloadType string
	// CorrelationID is an optional application-level identifier used to group
	// related messages across a multi-step process.
	CorrelationID string
	// ServiceName is the name of the service that published this message.
	ServiceName string
	// Payload is the decoded payload. Its concrete type is the prototype
	// registered via RegisterPayloadType, or map[string]any for unknown types.
	Payload any
	// Timestamp is the UTC time the message was published.
	Timestamp time.Time
}

Message is the decoded representation of a stored envelope delivered to a subscriber handler.

type Messenger

type Messenger struct {
	// contains filtered or unexported fields
}

Messenger is the top-level pub-sub instance. Construct with New.

func New

func New(cfg *Config, opts ...Option) (*Messenger, error)

New constructs and starts a Messenger. It creates the required directory layout, initialises all internal components, and starts any configured hub listener and client connections.

func (*Messenger) Close

func (m *Messenger) Close() error

Close gracefully stops all subscribers, peer connections, and internal goroutines. Safe to call more than once.

func (*Messenger) HubAddr

func (m *Messenger) HubAddr() string

HubAddr returns the network address the hub is listening on, or empty string if no hub is running. Useful in tests to discover the dynamically-assigned port.

func (*Messenger) InstanceName added in v0.5.0

func (m *Messenger) InstanceName() string

InstanceName returns the configured instance name for this messenger.

func (*Messenger) Publish

func (m *Messenger) Publish(ctx context.Context, channel, payloadType string, payload any) error

Publish creates an envelope for payload, writes it to channel's storage file, notifies local subscribers, and enqueues it for any configured peer senders. Publish blocks until the write is confirmed (per sync_policy).

func (*Messenger) RegisterPayloadType

func (m *Messenger) RegisterPayloadType(typeStr string, prototype any) error

RegisterPayloadType associates typeStr with the Go type of prototype for decoding incoming message payloads. Registering the same typeStr twice returns ErrPayloadTypeAlreadyRegistered.

func (*Messenger) Subscribe

func (m *Messenger) Subscribe(ctx context.Context, channel, subscriberID string, handler HandlerFunc) error

Subscribe registers handler for all new messages on channel. Delivery is at-least-once: the handler may be called again for the same message after a restart. The goroutine runs until ctx is cancelled or Unsubscribe is called.

func (*Messenger) Unsubscribe

func (m *Messenger) Unsubscribe(channel, subscriberID string) error

Unsubscribe stops the subscriber goroutine for (channel, subscriberID) and removes the offset file so the subscriber position is forgotten.

type Option

type Option func(*messengerOptions)

Option is a functional option for New.

func WithConfig

func WithConfig(cfg *Config) Option

WithConfig supplies a Config via option rather than as the positional argument to New. When both are provided the positional argument takes precedence.

func WithDataDir

func WithDataDir(dir string) Option

WithDataDir overrides Config.Storage.DataDir. Useful in tests or when the data directory is determined at runtime rather than in the config file.

func WithLogger

func WithLogger(l Logger) Option

WithLogger injects a structured logger. If not provided, log output is discarded. The Logger interface is compatible with log/slog.Logger:

messenger.New(cfg, messenger.WithLogger(slog.Default()))

type StorageConfig

type StorageConfig struct {
	// DataDir is the root directory for channel files, subscriber offsets, and the audit log.
	// Required.
	DataDir string `yaml:"data_dir"`

	// SyncPolicy controls when writes are flushed to stable storage.
	// Default: "periodic".
	SyncPolicy SyncPolicy `yaml:"sync_policy"`

	// SyncIntervalMS is the fsync interval in milliseconds when SyncPolicy is "periodic".
	// Default: 200.
	SyncIntervalMS int `yaml:"sync_interval_ms"`

	// MaxSubscriberLagMB triggers a warning log when a subscriber's unread backlog
	// exceeds this many megabytes. Default: 512.
	MaxSubscriberLagMB int `yaml:"max_subscriber_lag_mb"`

	// CompactionThresholdMB triggers file rotation when the consumed (already-read-by-all-
	// subscribers) portion of a channel file exceeds this size. Default: 256.
	CompactionThresholdMB int `yaml:"compaction_threshold_mb"`

	// OffsetFlushIntervalMS is the minimum time between subscriber offset file
	// flushes (fsync + atomic rename). 0 flushes after every delivered message,
	// which is the strictest at-least-once guarantee but slowest on high-latency
	// storage. Values > 0 batch flushes for higher throughput; on a crash the
	// subscriber may replay up to this many milliseconds of already-delivered
	// messages. Default: 0 (flush every message).
	OffsetFlushIntervalMS int `yaml:"offset_flush_interval_ms"`
}

StorageConfig controls the on-disk message store.

type SubscribersConfig

type SubscribersConfig struct {
	// MaxRetries is the number of times a failing handler is retried before the message
	// is routed to the dead-letter channel. Default: 5.
	//
	// Use a pointer so that an explicit value of 0 (fail immediately to the dead-letter
	// channel) is distinguishable from "not set" when loading from YAML. A nil value
	// is replaced with the default of 5 by [Config.ApplyDefaults].
	MaxRetries *int `yaml:"max_retries"`
}

SubscribersConfig controls subscriber delivery behaviour.

type SyncPolicy

type SyncPolicy string

SyncPolicy controls when channel file writes are flushed to stable storage.

const (
	// SyncPolicyNone lets the OS flush at its own discretion.
	// Publish() returns after write() succeeds. Fastest; data in OS page cache only.
	SyncPolicyNone SyncPolicy = "none"

	// SyncPolicyPeriodic flushes via a background fsync ticker.
	// Publish() returns after write() succeeds; fsync runs on a separate timer.
	// Durable against application crashes; not against OS crashes between intervals.
	SyncPolicyPeriodic SyncPolicy = "periodic"

	// SyncPolicyAlways fsyncs after every write before Publish() returns.
	// Slowest; durable against OS crashes and power failure.
	SyncPolicyAlways SyncPolicy = "always"
)

type TLSConfig

type TLSConfig struct {
	// Cert is the path to the instance's PEM-encoded certificate.
	Cert string `yaml:"cert"`

	// Key is the path to the instance's PEM-encoded private key.
	Key string `yaml:"key"`

	// CA is the path to the PEM-encoded CA certificate used to verify peers.
	CA string `yaml:"ca"`

	// MinVersion is the minimum TLS version. Default: "1.3".
	MinVersion string `yaml:"min_version"`

	// ExpiryWarnDays triggers a warning log when the instance cert or CA cert expires
	// within this many days. Default: 30.
	ExpiryWarnDays int `yaml:"expiry_warn_days"`
}

TLSConfig holds paths to the instance's TLS credentials.

Directories

Path Synopsis
cmd
keyop-messenger command
Command keyop-messenger provides the CLI for keyop-messenger key management.
Command keyop-messenger provides the CLI for keyop-messenger key management.
internal
audit
Package audit provides a structured audit writer for cross-hub forwarding events.
Package audit provides a structured audit writer for cross-hub forwarding events.
dedup
Package dedup provides deduplication of envelope IDs.
Package dedup provides deduplication of envelope IDs.
envelope
Package envelope defines the message envelope written to every channel file.
Package envelope defines the message envelope written to every channel file.
federation
Package federation implements the hub-to-hub and hub-to-client wire protocol.
Package federation implements the hub-to-hub and hub-to-client wire protocol.
registry
Package registry maps payload type discriminator strings to Go types for encoding and decoding message payloads.
Package registry maps payload type discriminator strings to Go types for encoding and decoding message payloads.
storage
Package storage implements the per-channel file writer and related storage primitives.
Package storage implements the per-channel file writer and related storage primitives.
testutil
Package testutil provides shared test helpers used across internal packages.
Package testutil provides shared test helpers used across internal packages.
tlsutil
Package tlsutil provides TLS configuration helpers, certificate generation, and hot-reload support for the keyop-messenger federation layer.
Package tlsutil provides TLS configuration helpers, certificate generation, and hot-reload support for the keyop-messenger federation layer.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL