tavern

package module
v0.4.76 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Apr 10, 2026 License: MIT Imports: 19 Imported by: 0

README

tavern

Go Reference

tavern

Live hypermedia delivery layer for Server-Sent Events (SSE) in Go.

A master of the React School visit Grug at cave.

Master say: "but how do you manage state?"

Grug say: "server manage state."

Master say: "but how does the client know when state changes?"

Grug say: "server tell it."

Master say: "but--"

Grug say: "server. tell. it."

-- The Recorded Sayings of Layman Grug, The Dothog Manifesto

Tavern delivers server-owned representations to browser clients over SSE. The server decides what changed; Tavern pushes it honestly -- with replay, reconnection recovery, and delivery shaping built in. No JavaScript framework required.

For practical patterns and integration examples, see the Recipe Cookbook.

Design boundaries

Tavern has an explicit design note covering what belongs in core, what belongs outside it, and what Tavern should refuse to become. See DESIGN.md.

The design guides in docs/ define the vocabulary and patterns for building on top of Tavern:

  • Topic Semantics -- resource, collection, presence, admin, and notification topic shapes. When to use path scoping vs broker scoping. Which subscription type fits which page.
  • Snapshot and Replay Patterns -- per-topic-category replay strategies, gap handling, reconnection UX. Why PublishWithID is required for Last-Event-ID recovery.
  • Page-Level Multiplexing -- single-connection multi-topic pages. StaticGroup vs DynamicGroup vs SubscribeMulti. OOB fragment composition for multi-region updates.

Where Tavern Shines

Tavern is a delivery layer for server-owned live representations, but some patterns fall out of it so naturally that they deserve a callout.

SaaS Notifications -- Scoped subscriptions + filters + TTL + replay + OOB fragments = complete real-time notification system. Per-user streams, org-wide broadcasts, toast auto-expiry, reconnection recovery. Wire it up to your existing auth middleware and you have per-tenant push notifications without a third-party service.

Live Dashboards -- Snapshot+delta streams, scheduled publisher with circuit breakers, adaptive backpressure for mixed client speeds, enhanced observability for monitoring the monitor. This is what tavern was built for.

Sports/Event Scoreboards -- Topic groups for single-connection multi-game views, hierarchical topics for league/team filtering, gap detection for seamless reconnection, batch publish for atomic multi-region updates.

E-commerce Real-time -- TTL for flash banners and cart timers, batch publish for inventory+price+availability in one flush, presence for "X people viewing," middleware for audit trails.

HTMX Server-Driven UI -- Tavern's home turf. OOB fragment swaps, lazy rendering that skips work when nobody's watching, templ component integration, mutation hooks that decouple handlers from SSE updates. The server owns the state, HTML goes over the wire.

Multi-Instance Deployment -- Pluggable backend interface, memory backend for testing, scope-aware message envelopes. Publish on instance A, subscribers on instance B get it.


Install

go get github.com/catgoose/tavern

Optional adapters (separate modules to avoid dependency pollution):

go get github.com/catgoose/tavern/tavernprom   # Prometheus export
go get github.com/catgoose/tavern/tavernotel   # OpenTelemetry export

Client-Side Helpers

Tavern emits control events (tavern-reconnected, tavern-replay-gap, tavern-topics-changed) over the SSE stream. The companion library tavern-js listens for these events and translates them into declarative UI behaviors — reconnection overlays, gap recovery, and topic change notifications — with zero custom JavaScript:

<script src="https://cdn.jsdelivr.net/gh/catgoose/tavern-js@latest/dist/tavern.min.js"></script>
<div sse-connect="/sse/notifications"
     sse-swap="message"
     data-tavern-reconnecting-class="opacity-50"
     data-tavern-gap-action="banner">

  <div data-tavern-status class="hidden">Reconnecting...</div>
</div>

See the tavern-js README for full API documentation, data attributes, and examples.


Quick start

broker := tavern.NewSSEBroker()
defer broker.Close()

ch, unsub := broker.Subscribe("events")
defer unsub()

broker.Publish("events", tavern.NewSSEMessage("update", `{"id":1}`).String())

for msg := range ch {
    // handle msg
}

Wire it up to an HTTP handler (works with any router):

// One line -- sets SSE headers, handles Last-Event-ID, streams with flush
mux.Handle("/sse/events", broker.SSEHandler("events"))

Or the manual way (Echo shown):

func sseHandler(broker *tavern.SSEBroker) echo.HandlerFunc {
    return func(c echo.Context) error {
        c.Response().Header().Set("Content-Type", "text/event-stream")
        c.Response().Header().Set("Cache-Control", "no-cache")
        c.Response().Header().Set("Connection", "keep-alive")

        ch, unsub := broker.Subscribe("events")
        defer unsub()

        for {
            select {
            case msg, ok := <-ch:
                if !ok {
                    return nil
                }
                if _, err := fmt.Fprint(c.Response(), msg); err != nil {
                    return nil
                }
                c.Response().Flush()
            case <-c.Request().Context().Done():
                return nil
            }
        }
    }
}

Override the built-in handler's write step for custom formatting:

mux.Handle("/sse", broker.SSEHandler("events",
    tavern.WithSSEWriter(func(w http.ResponseWriter, msg string) error {
        return myCustomWrite(w, msg)
    }),
))

Core pub/sub

The server sends a representation. The representation contains links and forms. The client follows them. THAT IS THE ENTIRE INTERACTION MODEL.

-- The Wisdom of the Uniform Interface, The Dothog Manifesto

The server speaks; the client listens. This is the natural order.

Subscribe / Publish / Unsubscribe / Close
ch, unsub := broker.Subscribe("events")
defer unsub()

broker.Publish("events", "hello, world")
broker.Close() // closes all channels, removes all topics

Publish fans out to every subscriber. Non-blocking -- if a subscriber's buffer is full, the message is silently dropped for that subscriber.

Scoped subscriptions (PublishTo)

Per-user, per-tenant, or per-resource message delivery:

ch, unsub := broker.SubscribeScoped("notifications", userID)
defer unsub()

broker.PublishTo("notifications", userID, msg)
broker.PublishOOBTo("notifications", userID, tavern.Replace("badge", `<span>3</span>`))

Scoped and unscoped subscribers are fully independent. Publish delivers only to unscoped; PublishTo delivers only to matching scoped subscribers.

Multiplexed subscriptions (SubscribeMulti)

Subscribe to multiple topics on a single channel, eliminating reflect.Select:

ch, unsub := broker.SubscribeMulti("network", "services", "alerts")
defer unsub()

for msg := range ch {
    sse := tavern.NewSSEMessage(msg.Topic, msg.Data).String()
    fmt.Fprint(w, sse)
}
Hierarchical topics with glob wildcards (SubscribeGlob)

Pattern-based subscriptions across topic hierarchies. Topics use / as the separator; * matches one segment, ** matches zero or more:

// All services under monitoring
ch, unsub := broker.SubscribeGlob("monitoring/services/*")
defer unsub()

// Everything under monitoring at any depth
ch, unsub := broker.SubscribeGlob("monitoring/**")
defer unsub()

Messages arrive as TopicMessage values tagged with the actual publish topic.


Publishing variants

Hypertext is the simultaneous presentation of information and controls such that the information BECOMES THE AFFORDANCE through which choices are obtained and actions are selected.

-- The Wisdom of the Uniform Interface, The Dothog Manifesto

PublishWithReplay / PublishWithID / SubscribeFromID

Cache recent messages so new subscribers get them on connect:

broker.SetReplayPolicy("activity", 10) // keep last 10
broker.PublishWithReplay("activity", msg)

Track message IDs for gap-free reconnection:

broker.PublishWithID("events", "evt-42", msg)

// On reconnect, browser sends Last-Event-ID -- replay only missed messages
ch, unsub := broker.SubscribeFromID("events", lastEventID)
PublishIfChanged

Content-based deduplication using FNV-64a hashing. Only publishes when the message actually differs:

broker.PublishIfChanged("dashboard", renderDashboard())
PublishDebounced / PublishThrottled
// Wait for 200ms of quiet, then publish the final value
broker.PublishDebounced("search-results", html, 200*time.Millisecond)

// At most once per second, first call immediate
broker.PublishThrottled("live-stats", html, time.Second)
PublishWithTTL

Ephemeral messages that auto-expire from the replay cache. Current subscribers get them immediately; new subscribers only see them if the TTL hasn't elapsed:

// Toast notification that expires in 5 seconds
broker.PublishWithTTL("toasts", toastHTML, 5*time.Second,
    tavern.WithAutoRemove("toast-42"), // sends OOB delete on expiry
)

Also available: PublishOOBWithTTL, PublishToWithTTL, PublishIfChangedWithTTL.

Batch publishing (Batch / Flush)

Buffer multiple publishes and deliver them as a single write per subscriber:

batch := broker.Batch()
batch.PublishOOB("dashboard", tavern.Replace("stats", statsHTML))
batch.PublishOOB("dashboard", tavern.Replace("chart", chartHTML))
batch.PublishOOB("dashboard", tavern.Replace("activity", feedHTML))
batch.Flush() // one atomic write per subscriber

Batches also support PublishWithTTL and PublishWithID for ephemeral and resumable messages (these execute immediately rather than buffering, since the TTL sweeper and ID tracker require instant processing).


OOB (out-of-band) fragments

The whole point -- the ENTIRE POINT -- of hypermedia is that the server tells the client what to do next IN THE RESPONSE ITSELF.

-- The Wisdom of the Uniform Interface, The Dothog Manifesto

OOB swaps are SSE's answer to this. The server sends the exact DOM mutations to apply:

broker.PublishOOB("events",
    tavern.Replace("stats-bar", "<span>42</span>"),
    tavern.Delete("task-row-5"),
    tavern.Append("activity-feed", "<li>New item</li>"),
    tavern.Prepend("alert-list", "<li>Alert!</li>"),
)
Component interface (templ integration)

Component renders itself to a writer. The interface matches templ.Component exactly -- pass templ components directly, no imports needed:

broker.PublishOOB("events",
    tavern.ReplaceComponent("stats-bar", views.StatsBar(stats)),
    tavern.AppendComponent("feed", views.FeedItem(item)),
)

If rendering fails, the fragment contains an HTML comment with the error rather than a partial render.

Lazy rendering (PublishLazyOOB)

Skip expensive rendering when nobody is listening:

broker.PublishLazyOOB("dashboard", func() []tavern.Fragment {
    stats := fetchStats(db) // only runs if someone is subscribed
    return []tavern.Fragment{
        tavern.ReplaceComponent("stats", views.StatsPanel(stats)),
    }
})

// With deduplication
broker.PublishLazyIfChangedOOB("dashboard", func() []tavern.Fragment { ... })
PublishOOBWithTTL

Ephemeral OOB fragments:

broker.PublishOOBWithTTL("toasts", 5*time.Second,
    tavern.Replace("toast-area", toastHTML),
)

SSE handlers

SSEHandler

The built-in handler sets SSE headers, handles Last-Event-ID resumption, and streams messages with flush:

mux.Handle("/sse/events", broker.SSEHandler("events"))
StreamSSE (composable primitive)

For routes that need a custom subscription flow — scoped, filtered, multi-topic, or with a bespoke message-to-frame encoder — use StreamSSE. It sits between raw subscription channels and the turnkey SSEHandler, handling headers, http.Flusher checks, context cancellation, optional snapshots, and optional heartbeats, while leaving subscription choice and encoding at the call site.

ch, unsub := broker.SubscribeScoped("orders", userID)
defer unsub()
return tavern.StreamSSE(r.Context(), w, ch, func(s string) string { return s })

Pair with WithStreamSnapshot to deliver initial state, WithStreamHeartbeat to keep per-connection keepalives flowing, or WithStreamWriter to plug in a custom frame writer (e.g. htmx-go).

Topic groups (GroupHandler / DynamicGroupHandler)

Serve multiple topics on a single SSE connection:

// Static group -- same topics for everyone
broker.DefineGroup("dashboard", []string{"stats", "alerts", "activity"})
mux.Handle("/sse/dashboard", broker.GroupHandler("dashboard"))

// Dynamic group -- per-request topic resolution (authorization, etc.)
broker.DynamicGroup("user-dashboard", func(r *http.Request) []string {
    user := auth.FromContext(r.Context())
    return topicsForRole(user.Role)
})
mux.Handle("/sse/user", broker.DynamicGroupHandler("user-dashboard"))
SSEHandler vs GroupHandler message format

SSEHandler and GroupHandler expect different message formats:

  • SSEHandler writes messages verbatim — callers pre-format with NewSSEMessage(event, data).String().
  • GroupHandler wraps messages automatically, using the topic name as the SSE event type.

GroupHandler detects pre-formatted SSE messages (those starting with event: or data:) and extracts the data payload before re-wrapping with the topic. This prevents double-wrapping when migrating from SSEHandler to GroupHandler, or when the same publish call serves both handler types.

// Both of these produce correct output through a GroupHandler:
broker.Publish("alerts", "disk-full")                              // raw string
broker.Publish("alerts", NewSSEMessage("alert", "disk-full").String()) // pre-formatted

// GroupHandler output in both cases:
//   event: alerts
//   data: disk-full

Control events (tavern-reconnected, tavern-replay-gap, etc.) always pass through unchanged regardless of format.

When using HTMX with GroupHandler, set sse-swap attributes to match topic names (the SSE event type), not the original event names from NewSSEMessage.

Snapshot + delta (SubscribeWithSnapshot)

Send a computed snapshot as the first message, then live updates. Eliminates the dual-render pattern:

ch, unsub := broker.SubscribeWithSnapshot("dashboard", func() string {
    return renderFullDashboard()
})
defer unsub()
// First message is the snapshot, then live publishes follow
Connection lifetime (WithMaxConnectionDuration)

Cap how long an SSE connection stays open. After the configured duration (plus 0–10% random jitter to prevent thundering herd), the handler sends a retry directive and closes. The browser's EventSource reconnects automatically with Last-Event-ID, so resumption is seamless:

mux.Handle("/sse/events", broker.SSEHandler("events",
    tavern.WithMaxConnectionDuration(5*time.Minute),
))

Works with SSEHandler, GroupHandler, and DynamicGroupHandler. Zero or negative duration disables the limit.


Subscriber management

Metadata (SubscribeWithMeta)

Tag subscribers for admin panels and debugging:

ch, unsub := broker.SubscribeWithMeta("dashboard", tavern.SubscribeMeta{
    ID:   sessionID,
    Meta: map[string]string{"user": userName, "addr": remoteAddr},
})
defer unsub()

subs := broker.Subscribers("dashboard")
broker.Disconnect("dashboard", sessionID) // force disconnect
Subscriber filtering (SubscribeWithFilter)

Per-subscriber message filtering in the publish path:

ch, unsub := broker.SubscribeWithFilter("activity", func(msg string) bool {
    return strings.Contains(msg, userID) // only this user's activity
})
defer unsub()

Non-matching messages are silently skipped without counting toward drops or backpressure.

Per-subscriber rate limiting (SubscribeWithRate)
ch, unsub := broker.SubscribeWithRate("live-data", tavern.Rate{
    MaxPerSecond: 5, // at most 5 msg/s to this subscriber
})
defer unsub()

Messages faster than the rate are held; the most recent held message is delivered when the interval elapses (latest-wins). Does not affect other subscribers.

Server-initiated subscription changes (AddTopic / RemoveTopic)

Dynamically modify a subscriber's topic set without reconnecting:

// Add a topic -- subscriber starts receiving it immediately
broker.AddTopic(subscriberID, "new-topic", true) // true = send control event

// Remove a topic
broker.RemoveTopic(subscriberID, "old-topic", true)

// Scope-wide changes
broker.AddTopicForScope("admin", "audit-log", true)

A tavern-topics-changed control event notifies the client so it can set up new SSE-swap targets.

Message coalescing (SubscribeWithCoalescing)

Latest-value-wins subscription for high-frequency data. When multiple messages arrive before the subscriber reads, only the most recent value is delivered -- stale values are replaced, not queued. Coalesced messages do not count as drops. Ideal for stock tickers, sensor readings, or any feed where intermediate values are irrelevant:

ch, unsub := broker.SubscribeWithCoalescing("prices:AAPL")
defer unsub()

Also available: SubscribeScopedWithCoalescing for scoped variants.

Connection events (WithConnectionEvents)

Publish subscribe/unsubscribe as SSE events on a meta topic:

broker := tavern.NewSSEBroker(tavern.WithConnectionEvents("_meta"))

ch, unsub := broker.Subscribe("_meta")
// Receives: {"event":"subscribe","topic":"dashboard","subscribers":3}
// Receives: {"event":"unsubscribe","topic":"dashboard","subscribers":2}

The meta topic does not generate recursive events for its own subscribers.

Composable subscribe options (SubscribeWith)

Instead of picking the right SubscribeWith* variant, compose capabilities with option functions:

ch, unsub := broker.SubscribeWith("topic",
    tavern.SubWithScope("user:123"),
    tavern.SubWithFilter(predicate),
    tavern.SubWithRate(tavern.Rate{MaxPerSecond: 1}),
    tavern.SubWithMeta(tavern.SubscribeMeta{ID: sessionID}),
    tavern.SubWithSnapshot(renderFull),
)
defer unsub()

The same pattern works for multi-topic and glob subscriptions:

ch, unsub := broker.SubscribeMultiWith(
    []string{"orders", "inventory"},
    tavern.SubWithFilter(predicate),
    tavern.SubWithRate(tavern.Rate{MaxPerSecond: 10}),
)

ch, unsub := broker.SubscribeGlobWith("monitoring/**",
    tavern.SubWithScope("region:us-east"),
)

Reactive hooks

After hooks (topic dependencies)

Fire callbacks after a successful publish to chain dependent updates:

broker.After("orders", func() {
    broker.PublishOOB("dashboard",
        tavern.ReplaceComponent("order-count", views.OrderCount(db)),
    )
})

Hooks run asynchronously in a new goroutine. Cycle detection prevents infinite loops (max depth 8, skips already-visited topics in the chain).

OnMutate / NotifyMutate

Decouple mutation signals from specific topics. Register handlers for logical resources, trigger them from your business logic:

broker.OnMutate("orders", func(evt tavern.MutationEvent) {
    order := evt.Data.(*Order)
    broker.PublishOOB("order-detail",
        tavern.ReplaceComponent("order-"+order.ID, views.OrderRow(order)),
    )
    broker.PublishOOB("dashboard",
        tavern.ReplaceComponent("order-stats", views.OrderStats(db)),
    )
})

// In your handler:
broker.NotifyMutate("orders", tavern.MutationEvent{ID: orderID, Data: order})
Publish middleware (Use / UseTopics)

Intercept, transform, or swallow publishes:

// Global middleware -- runs on every publish
broker.Use(func(next tavern.PublishFunc) tavern.PublishFunc {
    return func(topic, msg string) {
        slog.Info("publish", "topic", topic, "size", len(msg))
        next(topic, msg)
    }
})

// Topic-scoped -- wildcards with ":" separator
broker.UseTopics("admin:*", func(next tavern.PublishFunc) tavern.PublishFunc {
    return func(topic, msg string) {
        auditLog(topic, msg)
        next(topic, msg)
    }
})

Publish ordering

By default, concurrent publishes to the same topic may interleave freely -- no lock, no overhead. When message ordering matters (chat rooms, audit logs), opt in per topic:

broker.SetOrdered("chat:session:123", true)

Ordered topics serialize concurrent publishes through a per-topic mutex so all subscribers observe the same sequence. Disable it when you no longer need the guarantee:

broker.SetOrdered("chat:session:123", false)

Zero overhead for non-ordered topics. The ordering lock is only acquired when the topic is explicitly marked.


Reconnection and resilience

Pluggable replay storage (ReplayStore)

By default the broker keeps replay entries in memory. Plug in a ReplayStore to persist them across restarts or share them across instances:

store := tavern.NewMemoryReplayStore() // built-in in-memory implementation
broker := tavern.NewSSEBroker(tavern.WithReplayStore(store))
broker.SetReplayPolicy("events", 50)
broker.PublishWithID("events", "evt-1", msg)

// On reconnect, replayed from the store
ch, unsub := broker.SubscribeFromID("events", lastEventID)

Implement the ReplayStore interface for durable backends (Redis, Postgres, etc.):

type ReplayStore interface {
    Append(ctx context.Context, topic string, entry ReplayEntry) error
    AfterID(ctx context.Context, topic, lastID string, limit int) ([]ReplayEntry, bool, error)
    Latest(ctx context.Context, topic string, limit int) ([]ReplayEntry, error)
    DeleteTopic(ctx context.Context, topic string) error
    SetMaxEntries(ctx context.Context, topic string, n int) error
}

IDs are topic-scoped. TTL filtering happens at read time — stores must not return expired entries. AfterID returns found=false when the requested ID has been evicted; the broker treats this as a gap.

Replay gap detection (OnReplayGap / SetReplayGapPolicy)

Handle reconnections where the client's Last-Event-ID has rolled out of the replay log. Gap detection requires ID-backed replay — the topic must receive messages via PublishWithID (or PublishWithTTL) so that event IDs exist in the replay log. Without ID-backed publishes, SetReplayGapPolicy has no effect.

// Enable ID-backed replay so gap detection is meaningful.
broker.SetReplayPolicy("dashboard", 100)

broker.OnReplayGap("dashboard", func(sub *tavern.SubscriberInfo, lastID string) {
    slog.Warn("replay gap", "subscriber", sub.ID, "lastID", lastID)
})

// Fall back to a full snapshot when a gap is detected
broker.SetReplayGapPolicy("dashboard", tavern.GapFallbackToSnapshot, func() string {
    return renderFullDashboard()
})
Reconnection UX (OnReconnect / BundleOnReconnect)
broker.OnReconnect("dashboard", func(info tavern.ReconnectInfo) {
    slog.Info("reconnect", "topic", info.Topic, "gap", info.Gap, "missed", info.MissedCount)
    // Send a welcome-back message directly to this subscriber
    info.SendToSubscriber(tavern.NewSSEMessage("reconnected", "welcome back").String())
})

// Bundle replay messages into a single write to reduce DOM churn
broker.SetBundleOnReconnect("dashboard", true)
Buffer sizing for replay

The subscriber buffer (WithBufferSize) and the replay window (SetReplayPolicy) serve different purposes:

  • Replay window determines how many past messages Tavern retains for Last-Event-ID resumption.
  • Buffer size determines how many messages can be queued to a subscriber channel at once — including replay messages delivered on reconnect.

During reconnect, Tavern enqueues all eligible replay messages into the subscriber channel using non-blocking sends. If the replay burst exceeds the available buffer capacity, excess messages are dropped and a tavern-replay-truncated control event is emitted with the delivery counts.

Rule of thumb: if you expect reconnect bursts of up to N missed messages, set the buffer size to at least N plus headroom for control events and concurrent live publishes:

broker := tavern.NewSSEBroker(
    tavern.WithBufferSize(64), // enough for reconnect bursts up to ~60 messages
)
broker.SetReplayPolicy("dashboard", 50)
Scenario Suggested buffer size
Small replay windows (≤ 10 messages) Default (10) is fine
Demo / test with intentional reconnect gaps 64128
Production with large replay windows At least replay window size + 10–20 headroom

Note: SetBundleOnReconnect combines all replay messages into a single channel write, which avoids per-message buffer pressure. When bundling is enabled, buffer size only needs to accommodate the single bundled write plus control events.

Adaptive backpressure

Tiered response to slow subscribers -- throttle, simplify, then disconnect:

broker := tavern.NewSSEBroker(
    tavern.WithAdaptiveBackpressure(tavern.AdaptiveBackpressure{
        ThrottleAt:   5,   // deliver every 2nd message
        SimplifyAt:   20,  // apply simplified renderer
        DisconnectAt: 50,  // evict the subscriber
    }),
)

// Register a lightweight renderer for the simplify tier
broker.SetSimplifiedRenderer("dashboard", func(msg string) string {
    return `<div id="dashboard">Loading...</div>`
})

// Get notified on tier changes
broker.OnBackpressureTierChange(func(sub *tavern.SubscriberInfo, old, new tavern.BackpressureTier) {
    slog.Warn("backpressure", "subscriber", sub.ID, "old", old, "new", new)
})
Slow subscriber eviction

Simple threshold-based eviction without the full adaptive tier system:

broker := tavern.NewSSEBroker(
    tavern.WithSlowSubscriberEviction(100),
    tavern.WithSlowSubscriberCallback(func(topic string) {
        slog.Warn("slow subscriber evicted", "topic", topic)
    }),
)
Backpressure signaling (OnPublishDrop / PublishBlocking)

Get notified when messages are dropped, or block instead of dropping:

broker.OnPublishDrop(func(topic string, count int) {
    slog.Warn("messages dropped", "topic", topic, "count", count)
    metrics.IncrCounter("sse.drops", count)
})

For topics where loss is unacceptable, block until the subscriber catches up or a timeout elapses:

err := broker.PublishBlocking("audit-log", entry, 5*time.Second)
if errors.Is(err, tavern.ErrPublishTimeout) {
    // at least one subscriber couldn't keep up
}

Also available: PublishBlockingTo for scoped subscribers. A zero timeout falls back to non-blocking behavior.

App-shell lifeline architecture

For apps that go beyond page-local SSE, Tavern supports a lifeline + scoped streams pattern: one warm connection stays open for the life of the app shell, while optional high-bandwidth streams spin up and down as the user navigates.

Lifeline (control plane) -- always connected, low volume:

  • notifications, presence, nav-state changes
  • invalidation signals ("data changed, refetch when ready")
  • small counters, theme broadcasts

Scoped streams (data plane) -- active only for hot views:

  • charts, feeds, detail panels via SubscribeScoped / PublishTo
  • high-frequency section-specific updates with per-user isolation
Handoff with AddTopic / RemoveTopic

Use a single SSE connection with dynamic topic membership. Requires a SubscribeMultiWithMeta subscriber so AddTopic / RemoveTopic can target it by ID:

// Lifeline starts with control-plane topics.
ch, unsub := broker.SubscribeMultiWithMeta(
    tavern.SubscribeMeta{ID: sessionID},
    "notifications", "nav-state",
)

// User opens dashboard -- add its data topic.
broker.AddTopic(sessionID, "dashboard-data", true)
// Client receives tavern-topics-changed control event.

// User leaves dashboard -- remove the topic.
broker.RemoveTopic(sessionID, "dashboard-data", true)
Separate scoped connection

When a view needs independent backpressure, per-user scoping, or its own buffer/eviction policy, use a separate SSE connection with SubscribeScoped. This works well under HTTP/2 and HTTP/3 where additional streams are cheap:

// Main lifeline handler
mux.Handle("/sse/app", broker.SSEHandler("control"))

// Scoped panel stream -- per-user isolation with independent buffer
// Use SubscribeScoped in a custom handler, or separate SSEHandler per panel:
mux.Handle("/sse/panel", broker.SSEHandler("panel-data",
    tavern.WithMaxConnectionDuration(10*time.Minute),
))
Replay as fallback, not navigation

Replay and Last-Event-ID resumption exist for network interruptions, not for navigating between views. Normal navigation should add/remove topics or create/tear down scoped streams. Replay kicks in automatically if a connection drops and the browser reconnects.

Anti-patterns
Pattern Problem
Firehose -- one connection subscribed to every topic Buffer overflows, drops, backpressure issues
Reconnect-as-navigation -- tear down SSE on every route change Unnecessary latency, missed events during reconnect window
Duplicate DOM ownership -- two streams updating the same element Race conditions, flicker, unpredictable state

Rendering on hot pages: Transport backpressure and browser render cadence are separate concerns. If delivery metrics look healthy but the page stutters, see Recipe 28: Browser-safe rendering for high-frequency SSE.


Error handling

OnRenderError callback

Centralized error handling for render failures in scheduled publishers:

broker.OnRenderError(func(err *tavern.RenderError) {
    slog.Error("render failed",
        "topic", err.Topic,
        "section", err.Section,
        "error", err.Err,
        "count", err.Count,
    )
})
Circuit breaker for ScheduledPublisher

Protect scheduled sections from cascading failures:

pub.Register("services", 3*time.Second, renderServices, tavern.SectionOptions{
    CircuitBreaker: &tavern.CircuitBreakerConfig{
        FailureThreshold: 3,
        RecoveryInterval: 30 * time.Second,
        FallbackRender: func() string {
            return `<div id="services">Service data temporarily unavailable</div>`
        },
    },
})

After 3 consecutive failures, the circuit opens and renders the fallback. After 30 seconds, a probe request tests recovery.


Scheduled publishing

ScheduledPublisher manages multiple named sections with independent intervals. It ticks on a fast base interval, renders due sections into a shared buffer, and publishes one batched message per tick. Skips rendering when no subscribers are listening.

pub := broker.NewScheduledPublisher("dashboard", tavern.WithBaseTick(100*time.Millisecond))

pub.Register("network", 1*time.Second, func(ctx context.Context, buf *bytes.Buffer) error {
    return views.NetworkChart(snap).Render(ctx, buf)
})
pub.Register("services", 3*time.Second, func(ctx context.Context, buf *bytes.Buffer) error {
    return views.ServicesPanel(services).Render(ctx, buf)
})

broker.RunPublisher(ctx, pub.Start)

// Runtime interval changes
pub.SetInterval("network", 500*time.Millisecond)

RunPublisher launches a publisher goroutine with panic recovery, tracked by the broker's WaitGroup so Close() waits for all publishers to return.


Observability

Basic stats
if broker.HasSubscribers("system-stats") {
    broker.Publish("system-stats", renderStats())
}

counts := broker.TopicCounts()           // map[string]int
total := broker.SubscriberCount()        // int
drops := broker.PublishDrops()           // int64

s := broker.Stats()
// BrokerStats{Topics: int, Subscribers: int, PublishDrops: int64}
Per-topic metrics (WithMetrics)

Opt-in publish and drop counters per topic:

broker := tavern.NewSSEBroker(tavern.WithMetrics())

m := broker.Metrics()
for topic, stats := range m.TopicStats {
    fmt.Printf("%s: published=%d dropped=%d peak_subs=%d\n",
        topic, stats.Published, stats.Dropped, stats.PeakSubscribers)
}
Enhanced observability (WithObservability)

Latency histograms, subscriber lag, throughput, and connection durations:

broker := tavern.NewSSEBroker(tavern.WithObservability(tavern.ObservabilityConfig{
    PublishLatency:     true,
    SubscriberLag:      true,
    ConnectionDuration: true,
    TopicThroughput:    true,
}))

obs := broker.Observability()
p99 := obs.PublishLatencyP99("dashboard")
lag := obs.SubscriberLag("dashboard", broker)
rate := obs.TopicThroughput("dashboard")
snap := obs.Snapshot(broker) // all topics at once

Zero overhead when not configured.

Observability export

Ship Tavern's delivery metrics to Prometheus or OpenTelemetry without polluting core with heavy dependencies. The server knows what it sent; now your dashboards know too.

import "github.com/catgoose/tavern/tavernprom"

// Register with an existing Prometheus registerer.
unreg, err := tavernprom.Register(broker, prometheus.DefaultRegisterer)
defer unreg()

// Or get a standalone /metrics handler.
http.Handle("/metrics", tavernprom.Handler(broker))
import "github.com/catgoose/tavern/tavernotel"

// Register with an OpenTelemetry MeterProvider.
stop, err := tavernotel.Register(broker, otel.GetMeterProvider())
defer stop()

Both adapters are poll-based (Prometheus scrapes, OTel collection callbacks), export the same logical metrics (published, dropped, latency, throughput, evictions, connection durations), and support topic cardinality limiting to prevent label explosion. See tavernprom/ and tavernotel/ for full API docs.


Testing

The taverntest subpackage provides test helpers:

import "github.com/catgoose/tavern/taverntest"

// Recorder -- subscribe and collect messages
rec := taverntest.NewRecorder(broker, "events")
defer rec.Close()
rec.WaitFor(3, time.Second)
rec.AssertCount(t, 3)
rec.AssertContains(t, "expected-message")

// Capture -- declarative assertions
cap := taverntest.NewCapture(broker, "events")
defer cap.Close()
cap.WaitFor(2, time.Second)
cap.AssertMessages(t, "first", "second")

// MockBroker -- record publishes without a real broker
mock := taverntest.NewMockBroker()
mock.Publish("events", "msg")
mock.AssertPublished(t, "events", "msg")

// SlowSubscriber -- test backpressure and eviction
slow := taverntest.NewSlowSubscriber(broker, "events", taverntest.SlowSubscriberConfig{
    ReadDelay: 100 * time.Millisecond,
})
defer slow.Close()

// SimulatedConnection -- test reconnection and Last-Event-ID
conn := taverntest.NewSimulatedConnection(broker, "events")
conn.Disconnect()
conn.Reconnect()
conn.AssertReconnectMessages(t, ...)

// SSERecorder -- capture SSE wire output for handler tests
rec := taverntest.NewSSERecorder()
handler.ServeHTTP(rec, req)
rec.AssertEventCount(t, 3)
rec.AssertEvent(t, 0, taverntest.SSEEvent{Event: "update", Data: "hello"})

Subpackages

presence/ -- Structured presence tracking

Heartbeat-based presence with stale detection and optional OOB publishing:

import "github.com/catgoose/tavern/presence"

tracker := presence.New(broker, presence.Config{
    StaleTimeout: 30 * time.Second,
    RenderFunc: func(topic string, users []presence.Info) string {
        return renderPresenceList(users)
    },
    OnJoin:  func(topic string, info presence.Info) { /* ... */ },
    OnLeave: func(topic string, info presence.Info) { /* ... */ },
})
defer tracker.Close()

tracker.Join("doc-123", presence.Info{UserID: userID, Name: userName})
tracker.Heartbeat("doc-123", userID)
tracker.Update("doc-123", userID, map[string]any{"cursor": pos})
tracker.Leave("doc-123", userID)

users := tracker.List("doc-123")
tavernprom/ -- Prometheus export

Exports Tavern metrics as Prometheus metrics via the prometheus.Collector interface. Metrics are computed on scrape -- no background goroutines. Supports namespace prefixing and topic cardinality limiting.

import "github.com/catgoose/tavern/tavernprom"

unreg, err := tavernprom.Register(broker, prometheus.DefaultRegisterer)
tavernotel/ -- OpenTelemetry export

Exports Tavern metrics as OTel observable instruments. Callbacks fire during SDK collection cycles. Same logical metrics as the Prometheus adapter.

import "github.com/catgoose/tavern/tavernotel"

stop, err := tavernotel.Register(broker, otel.GetMeterProvider())
backend/ -- Distributed fan-out interface

The backend.Backend interface enables cross-process fan-out. Publishes on one broker instance reach subscribers on another. Message envelopes carry optional TTL and ID fields so replay semantics survive the trip across instances.

Backends can optionally implement HealthAwareBackend for health checking and automatic re-subscription on reconnect, or ObservableBackend to expose operational metrics (connected state, messages sent/received).

backend/memory/ -- In-process backend for testing

Simulate multi-instance deployments in tests:

import "github.com/catgoose/tavern/backend/memory"

mem := memory.New()
fork := mem.Fork() // shares the same message bus

broker1 := tavern.NewSSEBroker(tavern.WithBackend(mem))
broker2 := tavern.NewSSEBroker(tavern.WithBackend(fork))

// publish on broker1, subscribers on broker2 receive it

Configuration

NewSSEBroker accepts functional options:

Option Default Description
WithBufferSize(n) 10 Subscriber channel buffer capacity. Also limits how many replay messages can be queued during reconnect — see Buffer sizing for replay
WithDropOldest() drop newest Discard oldest queued message when buffer full
WithKeepalive(d) disabled Send SSE comment keepalives at interval
WithTopicTTL(d) disabled Auto-remove topics with no subscribers after TTL
WithSlowSubscriberEviction(n) disabled Evict after n consecutive drops
WithAdaptiveBackpressure(cfg) disabled Tiered backpressure (throttle/simplify/disconnect)
WithMaxSubscribers(n) unlimited Global cap on total concurrent subscribers
WithMaxSubscribersPerTopic(n) unlimited Per-topic cap on concurrent subscribers
WithAdmissionControl(fn) nil Custom predicate called on every subscribe attempt
WithMetrics() disabled Per-topic publish/drop counters
WithObservability(cfg) disabled Latency, lag, throughput, connection duration
WithConnectionEvents(topic) disabled Publish subscribe/unsubscribe events
WithMessageTTLSweep(d) 1s Interval for expired TTL entry cleanup
WithReplayStore(store) nil Pluggable replay persistence backend
WithLogger(l) nil Log panics and errors via slog
WithBackend(b) nil Cross-process fan-out backend

Handler options (passed to SSEHandler / GroupHandler / DynamicGroupHandler):

Option Default Description
WithSSEWriter(fn) default writer Custom write function for SSE messages
WithMaxConnectionDuration(d) disabled Graceful connection recycling with jitter
Connection admission control

Protect your broker from unbounded subscriber growth:

broker := tavern.NewSSEBroker(
    tavern.WithMaxSubscribers(10000),
    tavern.WithMaxSubscribersPerTopic(1000),
)

When a limit is reached, Subscribe returns nil and SSEHandler returns HTTP 503 Service Unavailable. For custom logic (per-tenant quotas, feature flags), use WithAdmissionControl:

broker := tavern.NewSSEBroker(
    tavern.WithAdmissionControl(func(topic string, currentCount int) bool {
        return currentCount < tenantLimit(topic)
    }),
)

Additional runtime configuration:

broker.SetReplayPolicy("topic", 10)        // replay cache size
broker.SetRetry("topic", 30*time.Second)   // client reconnect delay
broker.SetRetryAll(30*time.Second)          // all topics
broker.OnRenderError(func(err *tavern.RenderError) { ... })

SSE message format

msg := tavern.NewSSEMessage("update", `{"id":1}`).String()
// event: update\ndata: {"id":1}\n\n

msg := tavern.NewSSEMessage("update", data).WithID("42").WithRetry(5000).String()
// event: update\ndata: ...\nid: 42\nretry: 5000\n\n

Thread safety

All SSEBroker methods are safe for concurrent use. The broker uses sync.RWMutex internally: subscribing and unsubscribing take a write lock, publishing and reading counts take a read lock. Publish snapshots the subscriber set under the read lock, then sends outside it, so publishers never block each other.


Philosophy

Tavern follows the dothog design philosophy and the Dothog Manifesto: the server owns the representation, Tavern delivers it honestly, and sync.RWMutex is the only dependency you need for thread safety.

wife of Grug say from cave: "easy, easy, easy. like touching feet to ground when get out of bed. server return html. browser render html. what is difficult?"

-- The Recorded Sayings of Layman Grug, The Dothog Manifesto

Server publish event. Browser receive event. Tavern carry the voice. What is difficult?

SSE is the server telling the client what happened next, in real time. The event stream is just another representation -- the server speaks, the client listens, and nobody had to install an npm package to make it work.


Architecture

  handler --> broker.Publish("topic", msg)
                      |
                      +---> subscriber A (chan) ---> SSE endpoint ---> browser A
                      +---> subscriber B (chan) ---> SSE endpoint ---> browser B
                      +---> subscriber C (chan) ---> SSE endpoint ---> browser C

Benchmarks

Run the benchmark suite:

go test -bench=. -benchmem ./...

Covers fan-out throughput (1, 10, 100, 1000 subscribers), publish-to-receive latency, memory per subscriber, feature overhead (middleware, observability, backpressure, filter, ordering, coalescing), concurrent publish (ordered and unordered), batch flush, scoped publish, and content-based dedup.


License

MIT

Documentation

Overview

Package tavern provides a thread-safe, topic-based pub/sub broker for Server-Sent Events (SSE). It is designed for fan-out messaging where a server publishes events and multiple HTTP clients consume them via SSE streams.

All broker methods are safe for concurrent use by multiple goroutines.

Index

Examples

Constants

View Source
const (
	// TopicActivityFeed is a conventional topic name for site-wide activity feeds.
	TopicActivityFeed = "activity-feed"
)

Topic name constants are conventions for common real-time use cases. Any string works as a topic name; these are provided for consistent naming.

Variables

View Source
var ErrPublishTimeout = errors.New("tavern: publish timeout")

ErrPublishTimeout is returned by SSEBroker.PublishBlocking and SSEBroker.PublishBlockingTo when at least one subscriber's channel could not accept the message within the configured timeout.

Functions

func RenderComponent added in v0.4.28

func RenderComponent(cmp Component) string

RenderComponent renders a Component to a string. If rendering fails, it returns an HTML comment containing the escaped error message.

func RenderComponentErr added in v0.4.42

func RenderComponentErr(cmp Component) (string, error)

RenderComponentErr renders a Component to a string, returning the error separately instead of embedding it in an HTML comment. This is useful when you want to handle render errors explicitly rather than silently embedding them in the output.

func RenderFragments added in v0.3.2

func RenderFragments(fragments ...Fragment) string

RenderFragments concatenates fragments into a single SSE-ready HTML string. Each fragment is wrapped with hx-swap-oob for HTMX OOB processing.

func StreamSSE added in v0.4.74

func StreamSSE[T any](
	ctx context.Context,
	w http.ResponseWriter,
	ch <-chan T,
	encode func(T) string,
	opts ...StreamSSEOption,
) error

StreamSSE prepares an SSE response on w and streams frames from ch until ctx is cancelled or ch is closed. encode converts each channel value into the SSE frame to write; returning an empty string skips the value without terminating the stream.

StreamSSE sits between raw subscription channels and the higher-level SSEBroker.SSEHandler. It handles the mechanical parts of an SSE response — standard headers, http.Flusher verification, context cancellation, optional snapshot delivery, and optional heartbeats — while leaving the choice of subscription API, filtering, and message-to-frame conversion at the call site. It does not subscribe on your behalf.

Compose with any subscription API:

ch, unsub := broker.SubscribeScoped("orders", userID)
defer unsub()
return tavern.StreamSSE(r.Context(), w, ch, func(s string) string {
    return s
})

For multiplexed channels that carry TopicMessage, the encoder typically wraps each value with the topic as the event type:

ch, unsub := broker.SubscribeMulti("orders", "invoices")
defer unsub()
return tavern.StreamSSE(r.Context(), w, ch, func(tm tavern.TopicMessage) string {
    return tavern.NewSSEMessage(tm.Topic, tm.Data).String()
})

StreamSSE returns an error if w does not implement http.Flusher or if encode is nil. On normal termination — context cancellation, channel close, or write failure — it returns nil.

Types

type AdaptiveBackpressure added in v0.4.46

type AdaptiveBackpressure struct {
	// ThrottleAt is the consecutive drop count that triggers throttle tier.
	// In throttle tier the broker delivers every 2nd message to the subscriber.
	ThrottleAt int
	// SimplifyAt is the consecutive drop count that triggers simplify tier.
	// In simplify tier the broker attaches a fidelity hint and optionally
	// applies a simplified renderer registered for the topic.
	SimplifyAt int
	// DisconnectAt is the consecutive drop count that triggers eviction.
	DisconnectAt int
}

AdaptiveBackpressure configures tiered backpressure thresholds based on consecutive drop counts. Each threshold must be greater than the previous; a zero value disables that tier.

type BackpressureTier added in v0.4.46

type BackpressureTier int

BackpressureTier represents the current backpressure tier of a subscriber. Tiers escalate based on consecutive message drop counts.

const (
	// TierNormal means messages are delivered normally (0 consecutive drops).
	TierNormal BackpressureTier = iota
	// TierThrottle means the subscriber is receiving every Nth message.
	TierThrottle
	// TierSimplify means the subscriber receives simplified/lower-fidelity content.
	TierSimplify
	// TierDisconnect means the subscriber will be evicted.
	TierDisconnect
)

func (BackpressureTier) String added in v0.4.46

func (t BackpressureTier) String() string

String returns the tier name.

type BrokerMetrics added in v0.4.31

type BrokerMetrics struct {
	// TopicStats maps topic name to its metrics.
	TopicStats map[string]TopicMetrics
	// TotalPublished is the sum of all per-topic published counts.
	TotalPublished int64
	// TotalDropped is the sum of all per-topic dropped counts.
	TotalDropped int64
}

BrokerMetrics is a point-in-time snapshot of all broker metrics.

type BrokerOption added in v0.2.0

type BrokerOption func(*SSEBroker)

BrokerOption configures the SSE broker.

func WithAdaptiveBackpressure added in v0.4.46

func WithAdaptiveBackpressure(cfg AdaptiveBackpressure) BrokerOption

WithAdaptiveBackpressure enables tiered backpressure that adapts per-subscriber based on their consecutive drop count. This subsumes WithSlowSubscriberEviction: the DisconnectAt threshold acts as the eviction threshold.

Tiers from lowest to highest pressure:

  • Normal (0 drops): messages delivered normally
  • Throttle (≥ThrottleAt drops): delivers every 2nd message
  • Simplify (≥SimplifyAt drops): applies simplified renderer if registered
  • Disconnect (≥DisconnectAt drops): evicts the subscriber

The drop counter resets on any successful send, returning the subscriber to the normal tier automatically.

func WithAdmissionControl added in v0.4.49

func WithAdmissionControl(fn func(topic string, currentCount int) bool) BrokerOption

WithAdmissionControl sets a custom admission function that is called for every new subscription attempt. The function receives the topic name and the current total subscriber count for that topic. It should return true to allow the subscription or false to deny it.

func WithBackend added in v0.4.46

func WithBackend(b backend.Backend) BrokerOption

WithBackend configures the broker to use a cross-process fan-out backend. When set, every Publish also forwards the message to the backend, and the broker automatically subscribes to the backend when the first local subscriber joins a topic and unsubscribes when the last local subscriber leaves.

Messages arriving from the backend are dispatched directly to local subscriber channels — they skip middleware and After hooks to avoid duplicate side-effects across instances.

func WithBufferSize added in v0.2.0

func WithBufferSize(size int) BrokerOption

WithBufferSize sets the subscriber channel buffer size. Default is 10.

func WithConnectionEvents added in v0.4.34

func WithConnectionEvents(metaTopic string) BrokerOption

WithConnectionEvents enables publishing subscriber connect/disconnect events to the given meta topic. Events are JSON-formatted messages containing the event type, topic, and current subscriber count. The meta topic itself does not generate recursive events.

func WithDropOldest added in v0.4.24

func WithDropOldest() BrokerOption

WithDropOldest changes the subscriber buffer strategy from drop-newest (default) to drop-oldest. When a subscriber's buffer is full, the oldest buffered message is discarded to make room for the new one. This is useful for dashboards where the latest data is always more relevant than queued historical data.

func WithKeepalive added in v0.4.20

func WithKeepalive(interval time.Duration) BrokerOption

WithKeepalive enables periodic SSE comment keepalives sent to all subscribers at the given interval. This keeps connections alive through proxies and load balancers that close idle connections. A zero or negative interval disables keepalives (the default).

func WithLogger added in v0.4.18

func WithLogger(l *slog.Logger) BrokerOption

WithLogger sets a structured logger for the broker. When set, publisher panics and errors are logged. Default is nil (no logging).

func WithMaxSubscribers added in v0.4.49

func WithMaxSubscribers(n int) BrokerOption

WithMaxSubscribers sets a global limit on the total number of concurrent subscribers across all topics. Default is 0 (unlimited). When the limit is reached, new Subscribe calls return a nil channel and nil unsubscribe function, and SSEHandler returns HTTP 503 Service Unavailable.

func WithMaxSubscribersPerTopic added in v0.4.49

func WithMaxSubscribersPerTopic(n int) BrokerOption

WithMaxSubscribersPerTopic sets a per-topic limit on the number of concurrent subscribers. Default is 0 (unlimited). When the limit for a topic is reached, new Subscribe calls for that topic return a nil channel and nil unsubscribe function, and SSEHandler returns HTTP 503.

func WithMessageTTLSweep added in v0.4.44

func WithMessageTTLSweep(interval time.Duration) BrokerOption

WithMessageTTLSweep sets the interval at which the background goroutine checks for expired TTL entries in the replay cache. Default is 1 second. A shorter interval provides faster expiry at the cost of more frequent lock acquisitions. This option only takes effect when TTL publishes are used.

func WithMetrics added in v0.4.31

func WithMetrics() BrokerOption

WithMetrics enables per-topic publish and drop counters. When disabled (the default), metrics tracking has zero overhead. Use SSEBroker.Metrics to retrieve a snapshot.

func WithObservability added in v0.4.46

func WithObservability(config ObservabilityConfig) BrokerOption

WithObservability enables enhanced observability with the given configuration. Disabled by default — zero overhead when not configured.

func WithReplayStore added in v0.4.63

func WithReplayStore(store ReplayStore) BrokerOption

WithReplayStore sets an external ReplayStore for persisting replay entries. When configured, the broker delegates all replay read/write operations to the store instead of using its internal in-memory maps. This enables durable replay across process restarts or shared replay across multiple broker instances.

When no store is configured (the default), the broker uses its built-in in-memory replay, preserving existing behavior.

func WithSlowSubscriberCallback added in v0.4.34

func WithSlowSubscriberCallback(fn func(topic string)) BrokerOption

WithSlowSubscriberCallback sets a function that is called when a subscriber is evicted due to slow consumption. The callback receives the topic name and runs in its own goroutine.

func WithSlowSubscriberEviction added in v0.4.34

func WithSlowSubscriberEviction(threshold int) BrokerOption

WithSlowSubscriberEviction enables automatic disconnection of subscribers that have dropped more than threshold consecutive messages. When a subscriber is evicted, its channel is closed, triggering the client's EventSource to reconnect. The counter resets when a message is successfully delivered. A threshold of 0 disables eviction (the default).

func WithTopicTTL added in v0.4.21

func WithTopicTTL(ttl time.Duration) BrokerOption

WithTopicTTL sets how long a topic with zero subscribers may remain in the broker before it is automatically removed. A background goroutine sweeps at half the TTL interval. A zero or negative TTL disables auto-cleanup.

type BrokerStats added in v0.3.0

type BrokerStats struct {
	// Topics is the number of active topics.
	Topics int
	// Subscribers is the total number of active subscribers across all topics.
	Subscribers int
	// PublishDrops is the cumulative number of dropped messages.
	PublishDrops int64
}

BrokerStats is a point-in-time summary of broker state returned by SSEBroker.Stats.

type CircuitBreakerConfig added in v0.4.42

type CircuitBreakerConfig struct {
	// FailureThreshold is the number of consecutive failures before the
	// circuit opens. Must be at least 1.
	FailureThreshold int

	// RecoveryInterval is how long the circuit stays open before trying a
	// half-open probe. Must be positive.
	RecoveryInterval time.Duration

	// FallbackRender is called when the circuit is open. If nil, the section
	// is skipped while the circuit is open.
	FallbackRender func() string
}

CircuitBreakerConfig configures a circuit breaker for a scheduled section. The circuit breaker follows the standard closed/open/half-open state machine.

type Component added in v0.4.11

type Component interface {
	Render(ctx context.Context, w io.Writer) error
}

Component renders itself to a writer. This interface is intentionally identical to templ.Component so that templ components can be passed directly without tavern importing the templ package.

type FilterPredicate added in v0.4.46

type FilterPredicate func(msg string) bool

FilterPredicate is a function that returns true if the message should be delivered to the subscriber. It runs synchronously in the publish goroutine for every message on the topic, so implementations must be fast and non-blocking. Messages rejected by the predicate are silently skipped and do not count toward drop counts or backpressure tiers.

type Fragment added in v0.3.2

type Fragment struct {
	// ID is the target DOM element ID.
	ID string
	// Swap is the hx-swap-oob value: "outerHTML", "innerHTML", "delete",
	// "beforeend", or "afterbegin".
	Swap string
	// HTML is the inner HTML content. Empty for delete operations.
	HTML string
}

Fragment describes a targeted DOM mutation for HTMX OOB swaps via SSE. Use the convenience constructors Replace, Append, Prepend, and Delete instead of building Fragment values directly.

func Append added in v0.3.2

func Append(id, html string) Fragment

Append creates a fragment that appends content to the end of an element.

func AppendComponent added in v0.4.11

func AppendComponent(id string, cmp Component) Fragment

AppendComponent renders a Component and returns an Append fragment. If rendering fails, the fragment contains the error message as an HTML comment.

func Delete added in v0.3.2

func Delete(id string) Fragment

Delete creates a fragment that removes an element from the DOM.

func Prepend added in v0.3.2

func Prepend(id, html string) Fragment

Prepend creates a fragment that prepends content to the beginning of an element.

func PrependComponent added in v0.4.11

func PrependComponent(id string, cmp Component) Fragment

PrependComponent renders a Component and returns a Prepend fragment. If rendering fails, the fragment contains the error message as an HTML comment.

func Replace added in v0.3.2

func Replace(id, html string) Fragment

Replace creates a fragment that replaces an element's outer HTML.

func ReplaceComponent added in v0.4.11

func ReplaceComponent(id string, cmp Component) Fragment

ReplaceComponent renders a Component and returns a Replace fragment. If rendering fails, the fragment contains the error message as an HTML comment.

type GapStrategy added in v0.4.39

type GapStrategy int

GapStrategy determines how the broker responds when a subscriber reconnects with a Last-Event-ID that is no longer in the replay log (i.e., the log has rolled over and the requested ID is gone). Configure per-topic via SSEBroker.SetReplayGapPolicy.

const (
	// GapSilent is the default strategy. When a gap is detected, no replay
	// occurs and the subscriber receives only live messages going forward.
	// This preserves backwards compatibility with the existing behaviour.
	GapSilent GapStrategy = iota

	// GapFallbackToSnapshot uses the configured SnapshotFunc to generate a
	// full-state snapshot and delivers it to the subscriber before live
	// messages begin. This ensures the client can rebuild its state even
	// when the replay log has rolled over.
	GapFallbackToSnapshot
)

type LatencyHistogram added in v0.4.46

type LatencyHistogram struct {
	P50, P95, P99 time.Duration
	Count         int64
}

LatencyHistogram holds percentile latency data computed from a circular buffer of the most recent 1024 samples.

type MemoryReplayStore added in v0.4.63

type MemoryReplayStore struct {
	// contains filtered or unexported fields
}

MemoryReplayStore is an in-memory implementation of ReplayStore. It stores entries in ordered slices per topic with a configurable maximum size. All methods are safe for concurrent use.

func NewMemoryReplayStore added in v0.4.63

func NewMemoryReplayStore() *MemoryReplayStore

NewMemoryReplayStore creates a ready-to-use in-memory replay store.

func (*MemoryReplayStore) AfterID added in v0.4.63

func (m *MemoryReplayStore) AfterID(_ context.Context, topic, lastID string, _ int) ([]ReplayEntry, bool, error)

AfterID returns entries published after the given ID. If lastID is not found in the store, found is false and entries is nil (indicating a gap). Expired entries are filtered out.

func (*MemoryReplayStore) Append added in v0.4.63

func (m *MemoryReplayStore) Append(_ context.Context, topic string, entry ReplayEntry) error

Append stores a replay entry for a topic. If the number of entries exceeds the configured maximum for the topic, the oldest entries are discarded. The default maximum is 1 if not explicitly configured via MemoryReplayStore.SetMaxEntries.

func (*MemoryReplayStore) DeleteTopic added in v0.4.63

func (m *MemoryReplayStore) DeleteTopic(_ context.Context, topic string) error

DeleteTopic removes all replay entries for a topic and its max-size config.

func (*MemoryReplayStore) Latest added in v0.4.63

func (m *MemoryReplayStore) Latest(_ context.Context, topic string, limit int) ([]ReplayEntry, error)

Latest returns the most recent entries for a topic, up to limit. Expired entries are filtered out.

func (*MemoryReplayStore) SetMaxEntries added in v0.4.63

func (m *MemoryReplayStore) SetMaxEntries(_ context.Context, topic string, n int) error

SetMaxEntries configures the maximum number of entries to retain for a topic. If n <= 0, all entries and the limit are removed (equivalent to DeleteTopic). If the current number of entries exceeds n, the oldest are discarded.

type Middleware added in v0.4.41

type Middleware func(next PublishFunc) PublishFunc

Middleware wraps a PublishFunc to add cross-cutting behaviour to the publish pipeline. Middleware is called in registration order (first registered = outermost) and may transform the message, add side-effects, or swallow the publish entirely by not calling next.

type MutationEvent added in v0.4.43

type MutationEvent struct {
	// ID identifies the specific entity that was mutated (e.g., an order ID).
	ID string
	// Data holds the mutated entity or any additional context the handler needs.
	Data any
}

MutationEvent carries context about a resource mutation. It is passed to handlers registered via SSEBroker.OnMutate when SSEBroker.NotifyMutate is called. Resources are logical entities (e.g., "orders") decoupled from topic names.

type ObservabilityConfig added in v0.4.46

type ObservabilityConfig struct {
	// PublishLatency enables per-topic publish latency histograms.
	PublishLatency bool
	// SubscriberLag enables per-subscriber buffer depth gauges.
	SubscriberLag bool
	// ConnectionDuration enables tracking of subscriber connection durations.
	ConnectionDuration bool
	// TopicThroughput enables per-topic message rate calculation.
	TopicThroughput bool
}

ObservabilityConfig controls which observability features are enabled. By default all fields are false and observability has zero overhead. Enable individual features selectively to minimize runtime cost.

type ObservabilitySnapshot added in v0.4.46

type ObservabilitySnapshot struct {
	Topics map[string]TopicObservability
}

ObservabilitySnapshot is a point-in-time, export-friendly snapshot of all observability data across all topics. Obtain one via [observabilityState.Snapshot].

type PublishBatch added in v0.4.39

type PublishBatch struct {
	// contains filtered or unexported fields
}

PublishBatch buffers publish operations and flushes them as a single concatenated write per subscriber channel, reducing the number of SSE writes on the wire. Create one via SSEBroker.Batch. Multiple goroutines may call Publish/PublishTo concurrently, but Flush and Discard must be called at most once and not concurrently with publishes.

func (*PublishBatch) Discard added in v0.4.39

func (pb *PublishBatch) Discard()

Discard clears all buffered operations without sending anything.

Example

ExamplePublishBatch_Discard demonstrates discarding a batch without sending any messages.

package main

import (
	"fmt"

	"github.com/catgoose/tavern"
)

func main() {
	broker := tavern.NewSSEBroker()
	defer broker.Close()

	ch, unsub := broker.Subscribe("updates")
	defer unsub()

	batch := broker.Batch()
	batch.Publish("updates", "draft message")
	batch.Discard()

	// Verify nothing was delivered by publishing a sentinel.
	broker.Publish("updates", "after-discard")
	msg := <-ch
	fmt.Println(msg)
}
Output:
after-discard

func (*PublishBatch) Flush added in v0.4.39

func (pb *PublishBatch) Flush()

Flush sends all buffered messages. For each unique (topic, scope) combination the individual messages are concatenated, then routed through the same publish pipeline as SSEBroker.Publish / SSEBroker.PublishTo — middleware, rate limiting, filters, adaptive backpressure, glob subscribers, backend publish, observability, and after-hooks all execute exactly as they would for a regular publish. The concatenation preserves the batch's value proposition: each subscriber receives a single channel write containing all messages for that topic.

Flush should be called at most once; the batch is empty afterwards.

func (*PublishBatch) Publish added in v0.4.39

func (pb *PublishBatch) Publish(topic, msg string)

Publish buffers a message for all subscribers of the given topic.

func (*PublishBatch) PublishOOB added in v0.4.39

func (pb *PublishBatch) PublishOOB(topic string, fragments ...Fragment)

PublishOOB buffers OOB fragments for all subscribers of the given topic.

func (*PublishBatch) PublishOOBTo added in v0.4.39

func (pb *PublishBatch) PublishOOBTo(topic, scope string, fragments ...Fragment)

PublishOOBTo buffers OOB fragments for scoped subscribers matching the scope.

func (*PublishBatch) PublishTo added in v0.4.39

func (pb *PublishBatch) PublishTo(topic, scope, msg string)

PublishTo buffers a scoped message for subscribers matching the scope.

func (*PublishBatch) PublishWithID added in v0.4.52

func (pb *PublishBatch) PublishWithID(topic, id, msg string)

PublishWithID publishes a message with an associated event ID for Last-Event-ID resumption. Like PublishBatch.PublishWithTTL, this executes immediately rather than buffering.

func (*PublishBatch) PublishWithTTL added in v0.4.52

func (pb *PublishBatch) PublishWithTTL(topic, msg string, ttl time.Duration, opts ...TTLOption)

PublishWithTTL publishes a message with a TTL on the replay cache entry. Unlike other PublishBatch methods, this executes immediately rather than buffering because the TTL sweeper requires immediate processing.

type PublishFunc added in v0.4.41

type PublishFunc func(topic, msg string)

PublishFunc is the function signature for publish operations. Middleware wraps this to intercept, transform, or swallow publishes.

type PublisherFunc added in v0.4.18

type PublisherFunc func(ctx context.Context)

PublisherFunc is a long-running function that publishes messages to the broker. It receives the broker's context and should return when the context is cancelled.

type Rate added in v0.4.45

type Rate struct {
	// MaxPerSecond is a convenience field: converted to MinInterval internally.
	MaxPerSecond float64
	// MinInterval is the minimum time between deliveries to this subscriber.
	MinInterval time.Duration
}

Rate configures per-subscriber rate limiting. When a subscriber is rate-limited, messages published faster than the configured rate are held and only the most recent held message is delivered when the interval elapses (latest-wins). If both MaxPerSecond and MinInterval are set, MinInterval takes precedence.

type ReconnectCallback added in v0.4.46

type ReconnectCallback func(info ReconnectInfo)

ReconnectCallback is invoked when a subscriber reconnects with a Last-Event-ID header. It fires on ALL reconnections regardless of whether the replay log can satisfy the request.

type ReconnectInfo added in v0.4.46

type ReconnectInfo struct {
	// Topic is the topic the subscriber reconnected to.
	Topic string
	// SubscriberID is the caller-provided identifier (empty if not set via metadata).
	SubscriberID string
	// LastEventID is the Last-Event-ID sent by the client.
	LastEventID string
	// Gap is the time elapsed since the LastEventID was published. Zero if the
	// ID was not found in the replay log (e.g., it has rolled out).
	Gap time.Duration
	// MissedCount is the number of messages published after LastEventID that
	// the subscriber missed. Zero if the ID was not found in the replay log.
	MissedCount int
	// ReplayDelivered is the number of replay messages successfully enqueued
	// to the subscriber channel. This may be less than MissedCount if the
	// subscriber buffer was too small to hold all replay messages.
	ReplayDelivered int
	// ReplayDropped is the number of replay messages that could not be
	// enqueued because the subscriber buffer was full.
	ReplayDropped int
	// SendToSubscriber sends a message directly to this subscriber's channel.
	// The message is delivered as-is (raw SSE text). If the subscriber's buffer
	// is full the message is dropped silently.
	SendToSubscriber func(msg string)
}

ReconnectInfo provides context about a subscriber reconnection, including the gap duration and number of missed messages. It is passed to callbacks registered with SSEBroker.OnReconnect.

type RenderError added in v0.4.42

type RenderError struct {
	// Topic is the broker topic or scheduled publisher event associated with the error.
	Topic string

	// Section is the section name within a ScheduledPublisher (empty for broker-level errors).
	Section string

	// Err is the underlying error returned by the render function.
	Err error

	// Timestamp is when the error occurred.
	Timestamp time.Time

	// Count is the current consecutive failure count.
	Count int
}

RenderError contains structured information about a render failure. It implements the error interface and supports errors.Unwrap for the underlying error.

func (*RenderError) Error added in v0.4.42

func (e *RenderError) Error() string

Error implements the error interface.

func (*RenderError) Unwrap added in v0.4.42

func (e *RenderError) Unwrap() error

Unwrap returns the underlying error.

type RenderFunc added in v0.4.24

type RenderFunc func(ctx context.Context, buf *bytes.Buffer) error

RenderFunc renders content into the provided buffer. It receives the context (which is cancelled when the scheduled publisher stops) and a shared buffer to write HTML into. Multiple sections write to the same buffer in a single tick, so output should be self-contained fragments.

type ReplayEntry added in v0.4.25

type ReplayEntry struct {
	// ID is the SSE event identifier used for Last-Event-ID resumption.
	ID string
	// Msg is the raw message payload stored in the replay log.
	Msg string
	// ExpiresAt is when this entry should be purged from the replay cache.
	// A zero value means the entry does not expire.
	ExpiresAt time.Time
	// AutoRemoveID is the DOM element ID to send an OOB delete fragment for
	// when this entry expires. Empty means no auto-removal.
	AutoRemoveID string
	// PublishedAt records when the entry was originally published, used to
	// compute reconnection gap durations.
	PublishedAt time.Time
}

ReplayEntry pairs a message with its event ID for Last-Event-ID resumption support. When ExpiresAt is non-zero, the entry will be removed from the replay cache after that time (see SSEBroker.PublishWithTTL).

type ReplayGapCallback added in v0.4.39

type ReplayGapCallback func(sub *SubscriberInfo, lastEventID string)

ReplayGapCallback is invoked when a replay gap is detected for a subscriber. It receives the subscriber's info and the Last-Event-ID that could not be found in the replay log.

type ReplayStore added in v0.4.63

type ReplayStore interface {
	// Append stores a replay entry for a topic.
	Append(ctx context.Context, topic string, entry ReplayEntry) error

	// AfterID returns entries published after the given ID.
	// found indicates whether lastID exists in the store.
	// If found=false, the broker treats this as a gap.
	// Must not return expired entries (TTL filtering at read time).
	AfterID(ctx context.Context, topic, lastID string, limit int) (entries []ReplayEntry, found bool, err error)

	// Latest returns the most recent entries for a topic (for initial
	// subscribe replay). Must not return expired entries.
	Latest(ctx context.Context, topic string, limit int) ([]ReplayEntry, error)

	// DeleteTopic removes all replay entries for a topic.
	DeleteTopic(ctx context.Context, topic string) error

	// SetMaxEntries configures the maximum number of entries to retain for a
	// topic. When the limit is exceeded, the oldest entries are discarded.
	// A value of 0 or less removes all entries and the limit for the topic.
	SetMaxEntries(ctx context.Context, topic string, n int) error
}

ReplayStore is an abstraction for storing and retrieving replay entries. Implementations must be safe for concurrent use by multiple goroutines. The default in-memory implementation is MemoryReplayStore.

IDs are topic-scoped (not global). TTL filtering happens at read time: stores must not return expired entries from [ReplayStore.AfterID] or [ReplayStore.Latest].

type SSEBroker

type SSEBroker struct {
	// contains filtered or unexported fields
}

SSEBroker is a thread-safe, topic-based pub/sub message broker. Subscribers receive messages on a buffered channel and publishers fan out messages to all subscribers of a given topic. A zero-value SSEBroker is not usable; create one with NewSSEBroker.

Example (LifelineFallback)
package main

import (
	"fmt"

	"github.com/catgoose/tavern"
)

func main() {
	broker := tavern.NewSSEBroker()
	defer broker.Close()

	// Lifeline carries control-plane and fallback signals.
	ch, unsub := broker.SubscribeMultiWithMeta(tavern.SubscribeMeta{ID: "app"}, "control", "fallback")
	defer unsub()

	// Scoped panel stream -- active while viewing the panel.
	_, panelUnsub := broker.SubscribeScoped("panel-data", "user:1")

	// User navigates away -- panel stream torn down.
	panelUnsub()

	// Lifeline still delivers fallback/invalidation signals.
	broker.Publish("fallback", "data-stale")
	msg := <-ch
	fmt.Printf("topic=%s data=%s\n", msg.Topic, msg.Data)
}
Output:
topic=fallback data=data-stale
Example (LifelineHandoff)
package main

import (
	"fmt"

	"github.com/catgoose/tavern"
)

func main() {
	broker := tavern.NewSSEBroker()
	defer broker.Close()

	// Lifeline: one persistent connection for app-shell events.
	ch, unsub := broker.SubscribeMultiWithMeta(tavern.SubscribeMeta{ID: "app"}, "control")
	defer unsub()

	// User navigates to dashboard -- add topic dynamically.
	broker.AddTopic("app", "dashboard", true)

	// Control event confirms the topic was added.
	ctrl := <-ch
	fmt.Println("control event topic:", ctrl.Topic)

	// Both topics now deliver through the single lifeline channel.
	broker.Publish("dashboard", "chart-update")
	msg := <-ch
	fmt.Printf("topic=%s data=%s\n", msg.Topic, msg.Data)

	// User navigates away -- remove dashboard topic.
	broker.RemoveTopic("app", "dashboard", true)

	// Drain the removal control event.
	<-ch

}
Output:
control event topic: tavern-topics-changed
topic=dashboard data=chart-update
Example (LifelineReplay)
package main

import (
	"fmt"
	"strings"

	"github.com/catgoose/tavern"
)

func main() {
	broker := tavern.NewSSEBroker()
	defer broker.Close()

	broker.SetReplayPolicy("panel", 10)

	// Lifeline stays connected throughout.
	lifeline, lifelineUnsub := broker.SubscribeMultiWithMeta(tavern.SubscribeMeta{ID: "app"}, "control")
	defer lifelineUnsub()

	// Publish panel events with IDs while panel stream is down.
	broker.PublishWithID("panel", "e1", "update-1")
	broker.PublishWithID("panel", "e2", "update-2")

	// Panel stream reconnects with last known ID -- replay fills the gap.
	panelCh, panelUnsub := broker.SubscribeFromID("panel", "e1")
	defer panelUnsub()

	// Skip the reconnected control event.
	<-panelCh

	// Replayed message arrives.
	replayed := <-panelCh
	fmt.Println("replayed:", extractData(replayed))

	// Lifeline was never interrupted.
	broker.Publish("control", "still-alive")
	msg := <-lifeline
	fmt.Printf("lifeline: topic=%s data=%s\n", msg.Topic, msg.Data)
}

// extractData is a test helper that extracts the raw data from a message
// that may contain injected SSE id: fields.
func extractData(msg string) string {

	var parts []string
	for _, line := range strings.Split(msg, "\n") {
		if !strings.HasPrefix(line, "id: ") {
			parts = append(parts, line)
		}
	}
	result := strings.Join(parts, "\n")
	return strings.TrimSpace(result)
}
Output:
replayed: update-2
lifeline: topic=control data=still-alive
Example (Pubsub)
package main

import (
	"fmt"

	"github.com/catgoose/tavern"
)

// Topic name conventions for dashboard and real-time UI applications. Your
// application may use any string as a topic name; these are provided as
// examples of consistent naming patterns.
const TopicSystemStats = "system-stats"

func main() {
	broker := tavern.NewSSEBroker()
	defer broker.Close()

	ch, unsub := broker.Subscribe(TopicSystemStats)
	defer unsub()

	broker.Publish(TopicSystemStats, `{"cpu": 42}`)

	msg := <-ch
	fmt.Println(msg)
}
Output:
{"cpu": 42}

func NewSSEBroker

func NewSSEBroker(opts ...BrokerOption) *SSEBroker

NewSSEBroker creates a ready-to-use SSEBroker with no active topics or subscribers. It accepts optional BrokerOption values to override defaults.

func (*SSEBroker) AddTopic added in v0.4.46

func (b *SSEBroker) AddTopic(subscriberID, topic string, sendControl bool) bool

AddTopic adds a topic to an existing subscriber identified by subscriberID. The subscriber starts receiving messages from the new topic without reconnecting. If the subscriber is already subscribed to the topic, this is a no-op and returns false. Returns true if the topic was successfully added.

If sendControl is true, a control event with type "tavern-topics-changed" is sent on the subscriber's channel so the client can react (e.g., set up new SSE-swap targets).

func (*SSEBroker) AddTopicForScope added in v0.4.46

func (b *SSEBroker) AddTopicForScope(scope, topic string, sendControl bool) int

AddTopicForScope adds a topic to all subscribers with the matching scope. Returns the number of subscribers that had the topic added.

func (*SSEBroker) After added in v0.4.43

func (b *SSEBroker) After(topic string, fn func())

After registers a callback that fires asynchronously after a successful publish to the named topic. Multiple After hooks per topic are allowed and execute in registration order. Hooks run in a new goroutine and do not block the publish path.

After hooks that publish to other topics may trigger further After hooks. To prevent infinite cycles, the broker enforces a maximum nesting depth of 8 and will skip hooks that would re-enter a topic already in the current chain.

Calling After on a closed broker is a no-op.

Example

ExampleSSEBroker_After demonstrates After hooks that fire asynchronously after a successful publish. This is useful for triggering side effects like cache invalidation or dependent topic updates.

package main

import (
	"fmt"
	"sync"

	"github.com/catgoose/tavern"
)

func main() {
	broker := tavern.NewSSEBroker()
	defer broker.Close()

	var done sync.WaitGroup
	done.Add(1)

	ch, unsub := broker.Subscribe("audit-log")
	defer unsub()

	broker.After("orders", func() {
		broker.Publish("audit-log", "order topic updated")
		done.Done()
	})

	broker.Publish("orders", `{"id": 1}`)
	done.Wait()

	msg := <-ch
	fmt.Println(msg)
}
Output:
order topic updated

func (*SSEBroker) Batch added in v0.4.39

func (b *SSEBroker) Batch() *PublishBatch

Batch creates a new PublishBatch that buffers publish calls against this broker. Call PublishBatch.Flush to send all buffered messages or PublishBatch.Discard to throw them away.

Example

ExampleSSEBroker_Batch demonstrates atomic multi-message publishing. All messages in a batch are concatenated per topic and delivered as a single channel write, reducing SSE frame overhead.

package main

import (
	"fmt"

	"github.com/catgoose/tavern"
)

func main() {
	broker := tavern.NewSSEBroker()
	defer broker.Close()

	ch, unsub := broker.Subscribe("updates")
	defer unsub()

	batch := broker.Batch()
	batch.Publish("updates", "line1\n")
	batch.Publish("updates", "line2\n")
	batch.Flush()

	// The subscriber receives all batched messages as one concatenated write.
	msg := <-ch
	fmt.Print(msg)
}
Output:
line1
line2

func (*SSEBroker) ClearDedup added in v0.4.22

func (b *SSEBroker) ClearDedup(topic string)

ClearDedup resets the deduplication state for the given topic so the next call to SSEBroker.PublishIfChanged will always publish regardless of content.

func (*SSEBroker) ClearReplay added in v0.4.19

func (b *SSEBroker) ClearReplay(topic string)

ClearReplay removes the cached replay message for the given topic. Future subscribers will no longer receive a replayed message on connect.

func (*SSEBroker) Close

func (b *SSEBroker) Close()

Close shuts down the broker. It first waits for all publisher goroutines started via SSEBroker.RunPublisher to return, then closes all subscriber channels and removes all topics. After Close returns, any pending reads on subscriber channels will receive the zero value. It is safe to call Close while other goroutines are publishing or subscribing; however, no new messages will be delivered after Close returns.

func (*SSEBroker) DefineGroup added in v0.4.40

func (b *SSEBroker) DefineGroup(name string, topics []string)

DefineGroup registers a named static topic group. The group can later be served via SSEBroker.GroupHandler. Defining a group with the same name again replaces the previous definition.

func (*SSEBroker) Disconnect added in v0.4.34

func (b *SSEBroker) Disconnect(topic, subscriberID string) bool

Disconnect closes the subscriber channel with the given ID on the given topic. Returns true if the subscriber was found and disconnected.

func (*SSEBroker) DynamicGroup added in v0.4.40

func (b *SSEBroker) DynamicGroup(name string, fn func(r *http.Request) []string)

DynamicGroup registers a named dynamic topic group. The provided function is evaluated per-request to determine which topics a given connection should subscribe to. This enables per-request authorization — different users can receive different topic sets from the same endpoint.

func (*SSEBroker) DynamicGroupHandler added in v0.4.40

func (b *SSEBroker) DynamicGroupHandler(name string, opts ...SSEHandlerOption) http.Handler

DynamicGroupHandler returns an http.Handler that streams SSE messages for topics determined per-request by the dynamic group's function. Each message is formatted with the topic as the SSE event type.

DynamicGroupHandler accepts the same SSEHandlerOption values as SSEBroker.SSEHandler. If the group name has not been defined, the handler responds with 404.

Example

ExampleSSEBroker_DynamicGroupHandler demonstrates a dynamic topic group where the topics are resolved per-request. This enables per-user authorization so different users receive different topic sets from the same endpoint.

package main

import (
	"fmt"
	"net/http"

	"github.com/catgoose/tavern"
)

func main() {
	broker := tavern.NewSSEBroker()
	defer broker.Close()

	broker.DynamicGroup("user-feed", func(r *http.Request) []string {
		userID := r.URL.Query().Get("user")
		return []string{"global-feed", "user/" + userID + "/notifications"}
	})

	handler := broker.DynamicGroupHandler("user-feed")
	mux := http.NewServeMux()
	mux.Handle("/sse/feed", handler)

	fmt.Println("dynamic handler registered")
}
Output:
dynamic handler registered

func (*SSEBroker) GroupHandler added in v0.4.40

func (b *SSEBroker) GroupHandler(name string, opts ...SSEHandlerOption) http.Handler

GroupHandler returns an http.Handler that streams SSE messages for all topics in the named static group. Each message is formatted with the topic as the SSE event type and the published data as the data field.

GroupHandler accepts the same SSEHandlerOption values as SSEBroker.SSEHandler. If the group name has not been defined, the handler responds with 404.

Example

ExampleSSEBroker_GroupHandler demonstrates a static topic group that multiplexes several topics onto a single SSE connection. This is useful when a dashboard page needs data from multiple topics without opening separate EventSource connections.

package main

import (
	"fmt"
	"net/http"

	"github.com/catgoose/tavern"
)

func main() {
	broker := tavern.NewSSEBroker()
	defer broker.Close()

	broker.DefineGroup("dashboard", []string{"metrics", "alerts", "status"})

	handler := broker.GroupHandler("dashboard")
	mux := http.NewServeMux()
	mux.Handle("/sse/dashboard", handler)

	fmt.Println("handler registered")
}
Output:
handler registered

func (*SSEBroker) HasSubscribers

func (b *SSEBroker) HasSubscribers(topic string) bool

HasSubscribers reports whether the given topic has at least one active subscriber, including both unscoped and scoped subscribers. This is useful for skipping expensive serialization when no clients are listening.

func (*SSEBroker) Metrics added in v0.4.31

func (b *SSEBroker) Metrics() BrokerMetrics

Metrics returns a point-in-time snapshot of per-topic and aggregate metrics. Returns an empty BrokerMetrics if metrics are not enabled (see WithMetrics).

func (*SSEBroker) NewScheduledPublisher added in v0.4.24

func (b *SSEBroker) NewScheduledPublisher(event string, opts ...ScheduledPublisherOption) *ScheduledPublisher

NewScheduledPublisher creates a publisher that publishes to the given event/topic on the broker.

func (*SSEBroker) NotifyMutate added in v0.4.43

func (b *SSEBroker) NotifyMutate(resource string, event MutationEvent)

NotifyMutate triggers all handlers registered via SSEBroker.OnMutate for the named resource. Handlers execute synchronously in registration order in the caller's goroutine. If no handlers are registered for the resource, this is a no-op.

func (*SSEBroker) Observability added in v0.4.46

func (b *SSEBroker) Observability() *observabilityState

Observability returns the observability handle for the broker. Returns nil if observability is not enabled.

func (*SSEBroker) OnBackpressureTierChange added in v0.4.46

func (b *SSEBroker) OnBackpressureTierChange(fn func(sub *SubscriberInfo, oldTier, newTier BackpressureTier))

OnBackpressureTierChange sets a callback that fires whenever a subscriber transitions between backpressure tiers. The callback receives the subscriber info and the old and new tiers. The callback runs in its own goroutine.

func (*SSEBroker) OnFirstSubscriber added in v0.4.23

func (b *SSEBroker) OnFirstSubscriber(topic string, fn func(topic string))

OnFirstSubscriber registers a callback that fires when the given topic goes from zero to one total subscribers (counting both unscoped and scoped). The callback runs in its own goroutine and does not block Subscribe. Multiple hooks per topic are allowed and all will fire. Hooks persist across subscriber cycles. Calling this on a closed broker is a no-op.

Example

ExampleSSEBroker_OnFirstSubscriber demonstrates lifecycle hooks that fire when the first subscriber joins a topic and when the last one leaves.

package main

import (
	"fmt"
	"sync"

	"github.com/catgoose/tavern"
)

func main() {
	broker := tavern.NewSSEBroker()
	defer broker.Close()

	var firstDone, lastDone sync.WaitGroup
	firstDone.Add(1)
	lastDone.Add(1)

	broker.OnFirstSubscriber("prices", func(topic string) {
		fmt.Printf("first subscriber on %s\n", topic)
		firstDone.Done()
	})
	broker.OnLastUnsubscribe("prices", func(topic string) {
		fmt.Printf("last subscriber left %s\n", topic)
		lastDone.Done()
	})

	_, unsub := broker.Subscribe("prices")
	firstDone.Wait()

	unsub()
	lastDone.Wait()
}
Output:
first subscriber on prices
last subscriber left prices

func (*SSEBroker) OnLastUnsubscribe added in v0.4.23

func (b *SSEBroker) OnLastUnsubscribe(topic string, fn func(topic string))

OnLastUnsubscribe registers a callback that fires when the given topic goes from one to zero total subscribers (counting both unscoped and scoped). The callback runs in its own goroutine and does not block the unsubscribe call. Multiple hooks per topic are allowed and all will fire. Hooks persist across subscriber cycles. Calling this on a closed broker is a no-op.

func (*SSEBroker) OnMutate added in v0.4.43

func (b *SSEBroker) OnMutate(resource string, fn func(MutationEvent))

OnMutate registers a handler for the named resource. The handler fires when SSEBroker.NotifyMutate is called with the same resource name. Multiple handlers per resource are allowed and execute in registration order.

Resources are logical entities (e.g., "orders", "users") rather than topic names. This decouples the mutation signal from the specific topics that get updated.

Calling OnMutate on a closed broker is a no-op.

Example

ExampleSSEBroker_OnMutate demonstrates the OnMutate/NotifyMutate pattern for decoupling resource mutations from topic updates.

package main

import (
	"fmt"

	"github.com/catgoose/tavern"
)

func main() {
	broker := tavern.NewSSEBroker()
	defer broker.Close()

	ch, unsub := broker.Subscribe("order-updates")
	defer unsub()

	broker.OnMutate("orders", func(e tavern.MutationEvent) {
		broker.Publish("order-updates", fmt.Sprintf("order %s changed", e.ID))
	})

	broker.NotifyMutate("orders", tavern.MutationEvent{ID: "42", Data: "shipped"})

	msg := <-ch
	fmt.Println(msg)
}
Output:
order 42 changed

func (*SSEBroker) OnPublishDrop added in v0.4.51

func (b *SSEBroker) OnPublishDrop(fn func(topic string, droppedForCount int))

OnPublishDrop registers a callback that fires each time a message is dropped during fan-out because a subscriber's channel buffer is full. The callback receives the topic name and the number of subscribers for whom delivery failed. Only one callback is supported; subsequent calls replace the previous one.

The callback runs synchronously in the publish goroutine — keep it fast.

func (*SSEBroker) OnReconnect added in v0.4.46

func (b *SSEBroker) OnReconnect(topic string, fn ReconnectCallback)

OnReconnect registers a callback that fires when a subscriber reconnects with a Last-Event-ID header for the given topic. Unlike SSEBroker.OnReplayGap, which only fires when the replay log cannot satisfy the request, OnReconnect fires on every reconnection. The callback runs in its own goroutine and does not block the subscription. Multiple callbacks per topic are allowed. Calling this on a closed broker is a no-op.

func (*SSEBroker) OnRenderError added in v0.4.42

func (b *SSEBroker) OnRenderError(fn func(*RenderError))

OnRenderError registers a callback that fires when a render function fails. This applies to lazy OOB renders and scheduled section renders. Only one callback is supported; subsequent calls replace the previous callback. The callback receives a *RenderError with structured information about the failure. The callback runs synchronously in the goroutine where the error occurred — avoid blocking operations.

func (*SSEBroker) OnReplayGap added in v0.4.39

func (b *SSEBroker) OnReplayGap(topic string, fn ReplayGapCallback)

OnReplayGap registers a callback that fires when a subscriber reconnects with a Last-Event-ID that is no longer present in the replay log for the given topic. The callback runs in its own goroutine and does not block the subscription. Multiple callbacks per topic are allowed and all will fire. Like SSEBroker.SetReplayGapPolicy, gap callbacks are only meaningful when the topic uses ID-backed replay (see SetReplayGapPolicy for details). Calling this on a closed broker is a no-op.

func (*SSEBroker) Publish

func (b *SSEBroker) Publish(topic, msg string)

Publish fans out msg to every subscriber of the given topic. It is non-blocking: if a subscriber's channel buffer is full, the message is silently dropped for that subscriber rather than blocking the publisher. Publishing to a topic with no subscribers is a no-op.

func (*SSEBroker) PublishBlocking added in v0.4.51

func (b *SSEBroker) PublishBlocking(topic, msg string, timeout time.Duration) error

PublishBlocking behaves like SSEBroker.Publish but blocks up to timeout for each subscriber whose channel buffer is full. If any subscriber cannot accept the message within the timeout, ErrPublishTimeout is returned. A timeout of zero falls back to non-blocking behavior (equivalent to SSEBroker.Publish).

func (*SSEBroker) PublishBlockingTo added in v0.4.51

func (b *SSEBroker) PublishBlockingTo(topic, scope, msg string, timeout time.Duration) error

PublishBlockingTo behaves like SSEBroker.PublishTo but blocks up to timeout for each matching scoped subscriber whose channel buffer is full. Returns ErrPublishTimeout if any delivery times out.

func (*SSEBroker) PublishDebounced added in v0.4.24

func (b *SSEBroker) PublishDebounced(topic, msg string, after time.Duration)

PublishDebounced publishes msg to the topic after the given duration of quiet. If called again for the same topic before the duration elapses, the timer resets and only the latest message is published. This is useful for rapid state changes (typing indicators, slider drags) where intermediate states are noise. This method is safe for concurrent use.

Example

ExampleSSEBroker_PublishDebounced demonstrates debounced publishing where only the last message in a rapid sequence is delivered after a quiet period.

package main

import (
	"fmt"
	"time"

	"github.com/catgoose/tavern"
)

func main() {
	broker := tavern.NewSSEBroker()
	defer broker.Close()

	ch, unsub := broker.Subscribe("search")
	defer unsub()

	// Simulate rapid typing — only the final value should be published.
	broker.PublishDebounced("search", "h", 50*time.Millisecond)
	broker.PublishDebounced("search", "he", 50*time.Millisecond)
	broker.PublishDebounced("search", "hel", 50*time.Millisecond)
	broker.PublishDebounced("search", "hello", 50*time.Millisecond)

	msg := <-ch
	fmt.Println(msg)
}
Output:
hello

func (*SSEBroker) PublishDrops added in v0.3.0

func (b *SSEBroker) PublishDrops() int64

PublishDrops returns the cumulative number of messages that were dropped because a subscriber's channel buffer was full.

func (*SSEBroker) PublishIfChanged added in v0.4.22

func (b *SSEBroker) PublishIfChanged(topic, msg string) bool

PublishIfChanged publishes msg to the given topic only when it differs from the last message published via PublishIfChanged for that topic. It returns true if the message was published (content changed) or false if it was skipped (identical to the previous message). Comparison is done via an FNV-64a hash of the message content.

The deduplication state is per-topic and independent of SSEBroker.Publish. Use SSEBroker.ClearDedup to reset the stored hash for a topic.

func (*SSEBroker) PublishIfChangedOOB added in v0.4.29

func (b *SSEBroker) PublishIfChangedOOB(topic string, fragments ...Fragment) bool

PublishIfChangedOOB renders the given fragments and publishes the result to the topic only if it differs from the last message published via SSEBroker.PublishIfChanged for that topic. Returns true if published (content changed), false if skipped (identical).

func (*SSEBroker) PublishIfChangedOOBTo added in v0.4.29

func (b *SSEBroker) PublishIfChangedOOBTo(topic, scope string, fragments ...Fragment) bool

PublishIfChangedOOBTo renders the given fragments and publishes the result only to scoped subscribers of the topic whose scope matches, and only if the content differs from the last publish for that topic+scope. Returns true if published, false if skipped.

func (*SSEBroker) PublishIfChangedTo added in v0.4.29

func (b *SSEBroker) PublishIfChangedTo(topic, scope, msg string) bool

PublishIfChangedTo publishes msg to scoped subscribers of the given topic only when it differs from the last message published via PublishIfChangedTo for that topic+scope combination. Returns true if published, false if skipped. Comparison is done via an FNV-64a hash of the message content.

func (*SSEBroker) PublishIfChangedWithTTL added in v0.4.44

func (b *SSEBroker) PublishIfChangedWithTTL(topic, msg string, ttl time.Duration, opts ...TTLOption) bool

PublishIfChangedWithTTL combines deduplication with TTL. The message is published only if it differs from the last PublishIfChanged value for the topic, and the replay cache entry expires after the given TTL. Returns true if published (content changed), false if skipped.

func (*SSEBroker) PublishLazyIfChangedOOB added in v0.4.33

func (b *SSEBroker) PublishLazyIfChangedOOB(topic string, renderFn func() []Fragment) bool

PublishLazyIfChangedOOB calls renderFn only if the topic has subscribers, then publishes the rendered fragments only if the content differs from the last publish. Combines the subscriber guard with deduplication. Returns true if published (content changed), false if skipped (no subscribers, no fragments, or identical content).

func (*SSEBroker) PublishLazyIfChangedOOBTo added in v0.4.33

func (b *SSEBroker) PublishLazyIfChangedOOBTo(topic, scope string, renderFn func() []Fragment) bool

PublishLazyIfChangedOOBTo calls renderFn only if the topic has subscribers, then publishes the rendered fragments to scoped subscribers only if the content differs from the last publish for that topic+scope.

func (*SSEBroker) PublishLazyOOB added in v0.4.33

func (b *SSEBroker) PublishLazyOOB(topic string, renderFn func() []Fragment)

PublishLazyOOB calls renderFn only if the topic has subscribers, then publishes the rendered fragments. This avoids expensive rendering (DB queries, template execution) when nobody is listening. If renderFn returns no fragments, no message is published.

func (*SSEBroker) PublishLazyOOBTo added in v0.4.33

func (b *SSEBroker) PublishLazyOOBTo(topic, scope string, renderFn func() []Fragment)

PublishLazyOOBTo calls renderFn only if the topic has subscribers, then publishes the rendered fragments to scoped subscribers matching the scope.

func (*SSEBroker) PublishOOB added in v0.3.2

func (b *SSEBroker) PublishOOB(topic string, fragments ...Fragment)

PublishOOB renders the given fragments and publishes them as a single SSE event.

func (*SSEBroker) PublishOOBTo added in v0.4.0

func (b *SSEBroker) PublishOOBTo(topic, scope string, fragments ...Fragment)

PublishOOBTo renders the given fragments and publishes them only to scoped subscribers whose scope matches.

func (*SSEBroker) PublishOOBWithTTL added in v0.4.44

func (b *SSEBroker) PublishOOBWithTTL(topic string, ttl time.Duration, fragments ...Fragment)

PublishOOBWithTTL renders the given fragments and publishes them with a TTL on the replay cache entry. See SSEBroker.PublishWithTTL for TTL semantics.

func (*SSEBroker) PublishThrottled added in v0.4.24

func (b *SSEBroker) PublishThrottled(topic, msg string, interval time.Duration)

PublishThrottled publishes msg to the topic at most once per interval. The first call publishes immediately. Subsequent calls within the interval are held; when the interval elapses, the most recent held message is published (latest-wins). This guarantees bounded latency for the first message while rate-limiting subsequent ones. This method is safe for concurrent use.

Example

ExampleSSEBroker_PublishThrottled demonstrates throttled publishing where the first message publishes immediately and subsequent messages within the interval are rate-limited.

package main

import (
	"fmt"
	"time"

	"github.com/catgoose/tavern"
)

func main() {
	broker := tavern.NewSSEBroker()
	defer broker.Close()

	ch, unsub := broker.Subscribe("slider")
	defer unsub()

	// First call publishes immediately.
	broker.PublishThrottled("slider", "first", 100*time.Millisecond)

	msg := <-ch
	fmt.Println(msg)
}
Output:
first

func (*SSEBroker) PublishTo added in v0.4.0

func (b *SSEBroker) PublishTo(topic, scope, msg string)

PublishTo fans out msg only to scoped subscribers of the given topic whose scope matches. It is non-blocking: if a subscriber's channel buffer is full, the message is silently dropped. Publishing to a topic or scope with no matching subscribers is a no-op.

func (*SSEBroker) PublishToWithTTL added in v0.4.44

func (b *SSEBroker) PublishToWithTTL(topic, scope, msg string, ttl time.Duration, opts ...TTLOption)

PublishToWithTTL publishes msg to scoped subscribers with a TTL on the replay cache entry. See SSEBroker.PublishWithTTL for TTL semantics.

func (*SSEBroker) PublishWithID added in v0.4.25

func (b *SSEBroker) PublishWithID(topic, id, msg string)

PublishWithID publishes msg to the topic with an associated event ID. The message is cached in the replay log for Last-Event-ID resumption. The replay log size is controlled by SetReplayPolicy (default 1).

Example

ExampleSSEBroker_PublishWithID demonstrates publishing messages with event IDs for Last-Event-ID resumption. When a client reconnects, it can resume from where it left off using SubscribeFromID.

package main

import (
	"fmt"
	"strings"

	"github.com/catgoose/tavern"
)

func main() {
	broker := tavern.NewSSEBroker()
	defer broker.Close()

	broker.SetReplayPolicy("orders", 10)

	ch, unsub := broker.Subscribe("orders")
	defer unsub()

	broker.PublishWithID("orders", "evt-1", `{"order": "A"}`)

	// Live subscriber receives the message with the injected id: field.
	msg := <-ch
	fmt.Println(extractData(msg))
}

// extractData is a test helper that extracts the raw data from a message
// that may contain injected SSE id: fields.
func extractData(msg string) string {

	var parts []string
	for _, line := range strings.Split(msg, "\n") {
		if !strings.HasPrefix(line, "id: ") {
			parts = append(parts, line)
		}
	}
	result := strings.Join(parts, "\n")
	return strings.TrimSpace(result)
}
Output:
{"order": "A"}

func (*SSEBroker) PublishWithReplay added in v0.4.19

func (b *SSEBroker) PublishWithReplay(topic, msg string)

PublishWithReplay behaves like SSEBroker.Publish but also caches msg so that future subscribers of the topic immediately receive it on connect. Only the most recent message per topic is retained. Use SSEBroker.ClearReplay to remove the cached message for a topic.

func (*SSEBroker) PublishWithTTL added in v0.4.44

func (b *SSEBroker) PublishWithTTL(topic, msg string, ttl time.Duration, opts ...TTLOption)

PublishWithTTL behaves like SSEBroker.PublishWithReplay but marks the replay cache entry with a time-to-live. After the TTL expires, the entry is removed from the replay cache so new subscribers do not see stale messages. The message is delivered immediately to all current subscribers regardless of the TTL.

Example

ExampleSSEBroker_PublishWithTTL demonstrates ephemeral messages that expire from the replay cache after a duration. Current subscribers receive the message immediately, but new subscribers who connect after the TTL elapses will not see it.

package main

import (
	"fmt"
	"time"

	"github.com/catgoose/tavern"
)

func main() {
	broker := tavern.NewSSEBroker(tavern.WithMessageTTLSweep(10 * time.Millisecond))
	defer broker.Close()

	broker.SetReplayPolicy("alerts", 10)

	ch, unsub := broker.Subscribe("alerts")
	defer unsub()

	broker.PublishWithTTL("alerts", "temporary alert", 50*time.Millisecond)

	msg := <-ch
	fmt.Println(msg)
}
Output:
temporary alert

func (*SSEBroker) RemoveTopic added in v0.4.46

func (b *SSEBroker) RemoveTopic(subscriberID, topic string, sendControl bool) bool

RemoveTopic removes a topic from an existing subscriber identified by subscriberID. The subscriber stops receiving messages from the topic. Returns true if the topic was found and removed. Lifecycle hooks (OnLastUnsubscribe) fire if this was the last subscriber on the topic.

If sendControl is true, a control event with type "tavern-topics-changed" is sent on the subscriber's channel.

func (*SSEBroker) RemoveTopicForScope added in v0.4.46

func (b *SSEBroker) RemoveTopicForScope(scope, topic string, sendControl bool) int

RemoveTopicForScope removes a topic from all subscribers with the matching scope. Returns the number of subscribers that had the topic removed.

func (*SSEBroker) RunPublisher added in v0.4.18

func (b *SSEBroker) RunPublisher(ctx context.Context, fn PublisherFunc)

RunPublisher launches fn in a new goroutine with panic recovery. If fn panics, the panic is recovered and logged (when a logger is configured via WithLogger). The goroutine is tracked by the broker's internal wait group so that SSEBroker.Close blocks until all publishers have returned.

fn receives the provided context and should return when the context is cancelled. Callers typically pass a context derived from the application's shutdown signal.

func (*SSEBroker) SSEHandler added in v0.4.34

func (b *SSEBroker) SSEHandler(topic string, opts ...SSEHandlerOption) http.Handler

SSEHandler returns an http.Handler that streams SSE messages for the given topic. It handles Content-Type headers, Last-Event-ID resumption via SSEBroker.SubscribeFromID, and the streaming select loop.

The default writer calls fmt.Fprint followed by http.Flusher.Flush for each message. Override with WithSSEWriter for custom formatting.

// Standard library
mux.Handle("/sse/dashboard", broker.SSEHandler("dashboard"))

// Echo
e.GET("/sse/dashboard", echo.WrapHandler(broker.SSEHandler("dashboard")))

// Custom writer (e.g., htmx-go)
mux.Handle("/sse", broker.SSEHandler("events",
    tavern.WithSSEWriter(func(w http.ResponseWriter, msg string) error {
        return htmx.WriteSSE(w, msg)
    }),
))

func (*SSEBroker) SetBundleOnReconnect added in v0.4.46

func (b *SSEBroker) SetBundleOnReconnect(topic string, bundle bool)

SetBundleOnReconnect configures whether replay messages should be bundled into a single SSE write when a subscriber reconnects with a Last-Event-ID for the given topic. Bundling reduces DOM swap churn on the client by delivering all missed messages as one write.

func (*SSEBroker) SetOrdered added in v0.4.50

func (b *SSEBroker) SetOrdered(topic string, enabled bool)

SetOrdered marks or unmarks a topic as ordered. When a topic is ordered, concurrent publishes are serialized through a per-topic mutex so that all subscribers observe messages in the same order. Non-ordered topics (the default) have zero additional synchronization overhead. This method is safe for concurrent use.

Call with enabled=true before publishing to guarantee ordering. Call with enabled=false to remove the constraint. Toggling while publishes are in-flight is safe but ordering is only guaranteed while the topic is marked as ordered.

func (*SSEBroker) SetReplayGapPolicy added in v0.4.39

func (b *SSEBroker) SetReplayGapPolicy(topic string, strategy GapStrategy, snapshotFn func() string)

SetReplayGapPolicy configures the gap strategy and optional snapshot function for the given topic. When a subscriber reconnects with a Last-Event-ID that has rolled out of the replay log:

  • GapSilent: no special action (default, backwards compatible).
  • GapFallbackToSnapshot: call snapshotFn and deliver the result as the first message to the subscriber, preceded by a "event: tavern-replay-gap" control event.

The snapshotFn parameter is only used with GapFallbackToSnapshot and may be nil for other strategies.

Gap detection requires ID-backed replay state: the topic must receive messages via SSEBroker.PublishWithID (or variants like PublishWithTTL) so that a replay log with event IDs exists. Without ID-backed publishes, subscribers never receive event IDs and Last-Event-ID reconnection is not meaningful. Calling SetReplayGapPolicy on a topic that only uses plain SSEBroker.Publish has no effect at runtime.

If a *slog.Logger is configured via WithLogger, a warning is logged when this method is called for a topic that has no replay log entries and no external ReplayStore.

func (*SSEBroker) SetReplayPolicy added in v0.4.24

func (b *SSEBroker) SetReplayPolicy(topic string, n int)

SetReplayPolicy sets how many recent messages to cache for replay on the given topic. New subscribers receive up to n cached messages in order before live messages. Use n=0 to disable replay for the topic.

Example

ExampleSSEBroker_SetReplayPolicy demonstrates configuring replay so that new subscribers receive recently cached messages on connect.

package main

import (
	"fmt"

	"github.com/catgoose/tavern"
)

func main() {
	broker := tavern.NewSSEBroker()
	defer broker.Close()

	broker.SetReplayPolicy("news", 3)

	// Publish before any subscriber exists.
	broker.PublishWithReplay("news", "headline-1")
	broker.PublishWithReplay("news", "headline-2")
	broker.PublishWithReplay("news", "headline-3")

	// New subscriber receives the cached messages.
	ch, unsub := broker.Subscribe("news")
	defer unsub()

	for i := 0; i < 3; i++ {
		fmt.Println(<-ch)
	}
}
Output:
headline-1
headline-2
headline-3

func (*SSEBroker) SetRetry added in v0.4.34

func (b *SSEBroker) SetRetry(topic string, d time.Duration)

SetRetry sends an SSE retry directive to all subscribers of the given topic, including both unscoped and scoped subscribers. The browser's EventSource stores this value and uses it for the next reconnect attempt. Call before SSEBroker.Close in a graceful shutdown sequence to prevent clients from thundering-herding against new pods.

func (*SSEBroker) SetRetryAll added in v0.4.34

func (b *SSEBroker) SetRetryAll(d time.Duration)

SetRetryAll sends an SSE retry directive to all subscribers across all topics, including both unscoped and scoped subscribers. This is a convenience for graceful shutdown scenarios where every connected client should back off before reconnecting.

func (*SSEBroker) SetSimplifiedRenderer added in v0.4.46

func (b *SSEBroker) SetSimplifiedRenderer(topic string, fn func(string) string)

SetSimplifiedRenderer registers a function that produces lightweight content for the given topic. When a subscriber is in the simplify tier, the renderer is applied to the message before delivery. If no renderer is registered, the original message is delivered with no transformation.

func (*SSEBroker) Stats added in v0.3.0

func (b *SSEBroker) Stats() BrokerStats

Stats returns a point-in-time BrokerStats snapshot. It is a convenience method that combines SSEBroker.TopicCounts, SSEBroker.SubscriberCount, and SSEBroker.PublishDrops into a single lock acquisition.

Example
package main

import (
	"fmt"

	"github.com/catgoose/tavern"
)

// Topic name conventions for dashboard and real-time UI applications. Your
// application may use any string as a topic name; these are provided as
// examples of consistent naming patterns.
const (
	TopicSystemStats = "system-stats"

	TopicActivityFeed = "activity-feed"
)

func main() {
	broker := tavern.NewSSEBroker()
	defer broker.Close()

	_, unsub1 := broker.Subscribe(TopicSystemStats)
	defer unsub1()
	_, unsub2 := broker.Subscribe(TopicActivityFeed)
	defer unsub2()

	stats := broker.Stats()
	fmt.Printf("topics=%d subscribers=%d drops=%d\n", stats.Topics, stats.Subscribers, stats.PublishDrops)
}
Output:
topics=2 subscribers=2 drops=0

func (*SSEBroker) Subscribe

func (b *SSEBroker) Subscribe(topic string) (msgs <-chan string, unsubscribe func())

Subscribe registers a new subscriber for the given topic and returns a read-only channel that will receive published messages, along with an unsubscribe function. The caller must invoke the returned function when done to release resources and close the channel. Calling the unsubscribe function more than once is safe and has no effect after the first call.

The returned channel is buffered (default capacity 10, configurable via WithBufferSize). If the subscriber does not drain the channel fast enough, messages will be dropped by SSEBroker.Publish.

func (*SSEBroker) SubscribeFromID added in v0.4.25

func (b *SSEBroker) SubscribeFromID(topic, lastEventID string) (msgs <-chan string, unsubscribe func())

SubscribeFromID subscribes to a topic and replays all cached messages with IDs after lastEventID. If lastEventID is empty, all cached messages are replayed (same as Subscribe). If lastEventID is not found in the replay log, no replay occurs (gap too large) and only live messages are delivered.

This implements the server side of the SSE Last-Event-ID resumption protocol. The HTTP handler should read the Last-Event-ID header from the request and pass it here.

Example

ExampleSSEBroker_SubscribeFromID demonstrates replaying cached messages for a new subscriber when no Last-Event-ID is provided. When lastEventID is empty, all cached replay messages are delivered.

package main

import (
	"fmt"
	"strings"

	"github.com/catgoose/tavern"
)

func main() {
	broker := tavern.NewSSEBroker()
	defer broker.Close()

	broker.SetReplayPolicy("chat", 5)

	broker.PublishWithID("chat", "msg-1", "hello")
	broker.PublishWithID("chat", "msg-2", "world")

	// Empty lastEventID replays all cached messages.
	ch, unsub := broker.SubscribeFromID("chat", "")
	defer unsub()

	m1 := <-ch
	m2 := <-ch
	fmt.Println(extractData(m1))
	fmt.Println(extractData(m2))
}

// extractData is a test helper that extracts the raw data from a message
// that may contain injected SSE id: fields.
func extractData(msg string) string {

	var parts []string
	for _, line := range strings.Split(msg, "\n") {
		if !strings.HasPrefix(line, "id: ") {
			parts = append(parts, line)
		}
	}
	result := strings.Join(parts, "\n")
	return strings.TrimSpace(result)
}
Output:
hello
world

func (*SSEBroker) SubscribeFromIDWith added in v0.4.76

func (b *SSEBroker) SubscribeFromIDWith(topic, lastEventID string, opts ...SubscribeOption) (msgs <-chan string, unsubscribe func())

SubscribeFromIDWith creates a composable resume-aware subscription. When lastEventID is empty it behaves like SSEBroker.SubscribeWith with replay-cache replay. When lastEventID is non-empty it replays messages from the ID-backed replay log after that ID, emits a tavern-reconnected control event, and handles gap fallback consistently with SSEBroker.SubscribeFromID.

Option semantics:

  • SubWithFilter: applies uniformly to replay AND live messages. Control events (tavern-reconnected, tavern-replay-gap, tavern-replay-truncated) always bypass the filter.
  • SubWithMeta: applied the same as live subscriptions.
  • SubWithSnapshot: applied ONLY on fresh subscribe (lastEventID is empty); never delivered on successful resume. Gap-fallback snapshot is a separate mechanism configured via SSEBroker.SetReplayGapPolicy.
  • SubWithRate: applied to LIVE delivery only. Replay messages are delivered directly and are NOT rate-limited.
  • SubWithScope: sets the subscriber scope for live scope filtering. The replay log is scope-less, so any replayed messages are delivered regardless of scope. Note that SSEBroker.PublishWithID publishes only to unscoped subscribers, so a scoped resume subscriber will only receive live messages via scope-aware publish paths such as SSEBroker.PublishTo.

func (*SSEBroker) SubscribeGlob added in v0.4.46

func (b *SSEBroker) SubscribeGlob(pattern string) (msgs <-chan TopicMessage, unsubscribe func())

SubscribeGlob registers a subscriber for all topics matching the given glob pattern and returns a channel that receives TopicMessage values tagged with the actual publish topic. The pattern uses "/" as the topic separator:

  • "*" matches exactly one segment
  • "**" matches zero or more segments (any depth)

The returned unsubscribe function removes the glob subscriber and closes the channel. It is safe to call more than once.

Example

ExampleSSEBroker_SubscribeGlob demonstrates hierarchical topic patterns using glob subscriptions. The "*" wildcard matches a single segment and "**" matches zero or more segments.

package main

import (
	"fmt"

	"github.com/catgoose/tavern"
)

func main() {
	broker := tavern.NewSSEBroker()
	defer broker.Close()

	// Subscribe to all topics under "sensors/" with one level of nesting.
	ch, unsub := broker.SubscribeGlob("sensors/*")
	defer unsub()

	broker.Publish("sensors/temperature", `{"value": 22.5}`)
	broker.Publish("sensors/humidity", `{"value": 60}`)
	// This won't match because "sensors/floor/1" has two segments after "sensors".
	broker.Publish("sensors/floor/1", `{"value": 3}`)

	msg1 := <-ch
	msg2 := <-ch
	fmt.Printf("topic=%s data=%s\n", msg1.Topic, msg1.Data)
	fmt.Printf("topic=%s data=%s\n", msg2.Topic, msg2.Data)
}
Output:
topic=sensors/temperature data={"value": 22.5}
topic=sensors/humidity data={"value": 60}

func (*SSEBroker) SubscribeGlobScoped added in v0.4.46

func (b *SSEBroker) SubscribeGlobScoped(pattern, scope string) (msgs <-chan TopicMessage, unsubscribe func())

SubscribeGlobScoped registers a scoped glob subscriber. Only messages published via SSEBroker.PublishTo with a matching scope will be delivered.

func (*SSEBroker) SubscribeGlobWith added in v0.4.52

func (b *SSEBroker) SubscribeGlobWith(pattern string, opts ...SubscribeOption) (msgs <-chan TopicMessage, unsubscribe func())

SubscribeGlobWith subscribes to a glob pattern with composable options.

func (*SSEBroker) SubscribeMulti added in v0.4.34

func (b *SSEBroker) SubscribeMulti(topics ...string) (msgs <-chan TopicMessage, unsubscribe func())

SubscribeMulti subscribes to multiple topics and returns a single channel that receives TopicMessage values tagged with their source topic. The returned unsubscribe function removes the subscriber from all topics at once. Each topic counts toward its own subscriber total (lifecycle hooks fire correctly).

This eliminates the need for reflect.Select when a single SSE connection serves multiple topics.

Example

ExampleSSEBroker_SubscribeMulti demonstrates subscribing to multiple topics with a single channel. Each received message includes the source topic, eliminating the need for reflect.Select.

package main

import (
	"fmt"
	"sort"

	"github.com/catgoose/tavern"
)

func main() {
	broker := tavern.NewSSEBroker()
	defer broker.Close()

	ch, unsub := broker.SubscribeMulti("cpu", "memory", "disk")
	defer unsub()

	broker.Publish("memory", "85%")
	broker.Publish("cpu", "42%")
	broker.Publish("disk", "67%")

	// Collect all three messages.
	msgs := make([]string, 3)
	for i := 0; i < 3; i++ {
		m := <-ch
		msgs[i] = fmt.Sprintf("%s=%s", m.Topic, m.Data)
	}
	sort.Strings(msgs)
	for _, m := range msgs {
		fmt.Println(m)
	}
}
Output:
cpu=42%
disk=67%
memory=85%

func (*SSEBroker) SubscribeMultiFromID added in v0.4.76

func (b *SSEBroker) SubscribeMultiFromID(topics []string, lastEventID string) (msgs <-chan TopicMessage, unsubscribe func())

SubscribeMultiFromID subscribes to multiple topics with shared Last-Event-ID replay/resume semantics and returns a single channel of TopicMessage values tagged with their source topic. Each topic independently replays from its ID-backed replay log after lastEventID (or from the current cache if lastEventID is empty), then continues with live messages. The returned unsubscribe function closes all inner subscriptions at once.

Semantics:

  • The same lastEventID is applied uniformly to every topic. If a topic's replay log does not contain the ID, that topic's gap handling (reconnect callbacks, gap strategy) runs as configured via SetReplayGapPolicy.
  • Ordering is not guaranteed across topics. Within a single topic, messages preserve their published order.
  • Calling the returned unsubscribe closes all inner channels; the output channel is closed once all fan-in goroutines exit.

This is the multi-topic counterpart to SSEBroker.SubscribeFromID and mirrors SSEBroker.SubscribeMulti's fan-in structure.

func (*SSEBroker) SubscribeMultiWith added in v0.4.52

func (b *SSEBroker) SubscribeMultiWith(topics []string, opts ...SubscribeOption) (msgs <-chan TopicMessage, unsubscribe func())

SubscribeMultiWith subscribes to multiple topics with composable options. Options like filter and rate are applied uniformly to all topics.

func (*SSEBroker) SubscribeMultiWithMeta added in v0.4.46

func (b *SSEBroker) SubscribeMultiWithMeta(meta SubscribeMeta, topics ...string) (msgs <-chan TopicMessage, unsubscribe func())

SubscribeMultiWithMeta subscribes to multiple topics with metadata and returns a managed multi-subscription that supports dynamic topic changes via SSEBroker.AddTopic and SSEBroker.RemoveTopic.

func (*SSEBroker) SubscribeScoped added in v0.4.0

func (b *SSEBroker) SubscribeScoped(topic, scope string) (msgs <-chan string, unsubscribe func())

SubscribeScoped registers a subscriber with a scope key for the given topic. Only messages published via SSEBroker.PublishTo with a matching scope will be delivered. The returned unsubscribe function releases resources and closes the channel; it is safe to call more than once.

func (*SSEBroker) SubscribeScopedWithCoalescing added in v0.4.48

func (b *SSEBroker) SubscribeScopedWithCoalescing(topic, scope string) (msgs <-chan string, unsubscribe func())

SubscribeScopedWithCoalescing registers a scoped coalescing subscriber. Only messages published via SSEBroker.PublishTo with a matching scope are delivered, and rapid updates are coalesced to the latest value.

func (*SSEBroker) SubscribeScopedWithFilter added in v0.4.46

func (b *SSEBroker) SubscribeScopedWithFilter(topic, scope string, predicate FilterPredicate) (msgs <-chan string, unsubscribe func())

SubscribeScopedWithFilter registers a scoped subscriber with a predicate filter. Only messages published via SSEBroker.PublishTo with a matching scope AND passing the predicate are delivered. Non-matching messages are silently skipped.

func (*SSEBroker) SubscribeScopedWithRate added in v0.4.45

func (b *SSEBroker) SubscribeScopedWithRate(topic, scope string, rate Rate) (msgs <-chan string, unsubscribe func())

SubscribeScopedWithRate registers a scoped subscriber with per-subscriber rate limiting. See SSEBroker.SubscribeWithRate for rate-limiting semantics.

func (*SSEBroker) SubscribeWith added in v0.4.52

func (b *SSEBroker) SubscribeWith(topic string, opts ...SubscribeOption) (msgs <-chan string, unsubscribe func())

SubscribeWith creates a subscription using composable options. It combines the functionality of Subscribe, SubscribeScoped, SubscribeWithFilter, SubscribeWithRate, SubscribeWithMeta, and SubscribeWithSnapshot into a single call.

func (*SSEBroker) SubscribeWithCoalescing added in v0.4.48

func (b *SSEBroker) SubscribeWithCoalescing(topic string) (msgs <-chan string, unsubscribe func())

SubscribeWithCoalescing registers a subscriber that automatically coalesces rapid updates. When multiple messages are published before the subscriber reads, only the latest value is delivered. This is ideal for high-frequency data like stock tickers or sensor readings where intermediate values are irrelevant.

Replaced (coalesced) messages do not count as drops in SSEBroker.PublishDrops. The coalescing channel uses an internal atomic pointer so updates are lock-free in the publish path.

The returned channel receives the latest message whenever a new value is available. Call the returned function to unsubscribe and close the channel. The unsubscribe function is safe to call more than once.

func (*SSEBroker) SubscribeWithFilter added in v0.4.46

func (b *SSEBroker) SubscribeWithFilter(topic string, predicate FilterPredicate) (msgs <-chan string, unsubscribe func())

SubscribeWithFilter registers a subscriber with a predicate filter for the given topic. Only messages for which the predicate returns true are delivered to the subscriber's channel. Non-matching messages are silently skipped and do not count toward drop counts or backpressure.

The predicate runs in the publish goroutine — keep it fast.

func (*SSEBroker) SubscribeWithMeta added in v0.4.34

func (b *SSEBroker) SubscribeWithMeta(topic string, meta SubscribeMeta) (msgs <-chan string, unsubscribe func())

SubscribeWithMeta registers a subscriber with optional metadata. Behaves like SSEBroker.Subscribe but attaches metadata queryable via SSEBroker.Subscribers.

func (*SSEBroker) SubscribeWithRate added in v0.4.45

func (b *SSEBroker) SubscribeWithRate(topic string, rate Rate) (msgs <-chan string, unsubscribe func())

SubscribeWithRate registers a subscriber with per-subscriber rate limiting. Messages published faster than the configured rate are held, and the most recent held message is delivered when the interval elapses (latest-wins). Rate limiting is per-subscriber and does not affect the publisher or other subscribers.

func (*SSEBroker) SubscribeWithSnapshot added in v0.4.34

func (b *SSEBroker) SubscribeWithSnapshot(topic string, snapshotFn func() string) (msgs <-chan string, unsubscribe func())

SubscribeWithSnapshot subscribes to a topic and immediately sends the result of snapshotFn as the first message before any live publishes. The snapshot function runs while the subscription is being registered, ensuring no messages are missed between the snapshot and live stream. If snapshotFn returns an empty string, no snapshot is sent.

This eliminates the dual-render pattern where page handlers and publishers independently render the same initial state.

func (*SSEBroker) SubscriberCount added in v0.3.0

func (b *SSEBroker) SubscriberCount() int

SubscriberCount returns the total number of active subscribers across all topics, including both unscoped and scoped subscribers.

func (*SSEBroker) Subscribers added in v0.4.34

func (b *SSEBroker) Subscribers(topic string) []SubscriberInfo

Subscribers returns a snapshot of all active subscribers for the given topic. The returned slice is a copy and safe to read without synchronization.

func (*SSEBroker) TopicCounts

func (b *SSEBroker) TopicCounts() map[string]int

TopicCounts returns a snapshot of the number of active subscribers per topic. The returned map is a copy and safe to read without synchronization. Counts include both unscoped and scoped subscribers.

func (*SSEBroker) UnsubscribeGlob added in v0.4.46

func (b *SSEBroker) UnsubscribeGlob(ch <-chan TopicMessage)

UnsubscribeGlob is a convenience alias: callers may pass the channel returned by [SubscribeGlob] but the idiomatic approach is to call the unsubscribe function returned alongside the channel. This method finds and removes the glob subscription associated with ch, closing the channel.

func (*SSEBroker) Use added in v0.4.41

func (b *SSEBroker) Use(mw Middleware)

Use registers global middleware that runs on every publish regardless of topic. Middleware executes in registration order (first registered = outermost wrapper). It must be called before any publishes; adding middleware while publishing is safe but the new middleware only takes effect on subsequent publishes.

Example

ExampleSSEBroker_Use demonstrates global publish middleware. Middleware wraps every publish call and can transform messages, add logging, or swallow publishes entirely.

package main

import (
	"fmt"
	"strings"

	"github.com/catgoose/tavern"
)

func main() {
	broker := tavern.NewSSEBroker()
	defer broker.Close()

	// Add a middleware that uppercases all messages.
	broker.Use(func(next tavern.PublishFunc) tavern.PublishFunc {
		return func(topic, msg string) {
			next(topic, strings.ToUpper(msg))
		}
	})

	ch, unsub := broker.Subscribe("events")
	defer unsub()

	broker.Publish("events", "hello world")

	msg := <-ch
	fmt.Println(msg)
}
Output:
HELLO WORLD

func (*SSEBroker) UseTopics added in v0.4.41

func (b *SSEBroker) UseTopics(pattern string, mw Middleware)

UseTopics registers middleware that runs only when the topic matches the given pattern. Pattern matching uses simple wildcard rules:

  • An asterisk (*) matches any sequence of characters within a single segment (between colons).
  • A pattern without wildcards must match the topic exactly.

Examples: "orders:*" matches "orders:list" and "orders:detail" but not "orders:detail:item".

Example

ExampleSSEBroker_UseTopics demonstrates topic-scoped middleware that only runs when the publish topic matches a pattern.

package main

import (
	"fmt"

	"github.com/catgoose/tavern"
)

func main() {
	broker := tavern.NewSSEBroker()
	defer broker.Close()

	// Add middleware only for topics matching "log:*".
	broker.UseTopics("log:*", func(next tavern.PublishFunc) tavern.PublishFunc {
		return func(topic, msg string) {
			next(topic, "[LOG] "+msg)
		}
	})

	logCh, unsub1 := broker.Subscribe("log:app")
	defer unsub1()
	dataCh, unsub2 := broker.Subscribe("data")
	defer unsub2()

	broker.Publish("log:app", "request handled")
	broker.Publish("data", "raw value")

	fmt.Println(<-logCh)
	fmt.Println(<-dataCh)
}
Output:
[LOG] request handled
raw value

type SSEHandlerOption added in v0.4.34

type SSEHandlerOption func(*sseHandler)

SSEHandlerOption configures the SSE handler.

func WithMaxConnectionDuration added in v0.4.57

func WithMaxConnectionDuration(d time.Duration) SSEHandlerOption

WithMaxConnectionDuration sets a maximum lifetime for SSE connections. After the configured duration (plus 0-10% random jitter to prevent thundering herd), the handler sends a retry: directive and an SSE comment, then closes the connection. The browser's EventSource will automatically reconnect with Last-Event-ID, providing seamless resumption.

A zero or negative duration disables the limit.

func WithReconnectDelay added in v0.4.63

func WithReconnectDelay(delay time.Duration) SSEHandlerOption

WithReconnectDelay sets the SSE retry: value (in milliseconds) sent when a connection is closed due to WithMaxConnectionDuration. The browser's EventSource uses this value to determine how long to wait before reconnecting. A zero or negative value defaults to 1000ms.

func WithSSEWriter added in v0.4.34

func WithSSEWriter(fn SSEWriterFunc) SSEHandlerOption

WithSSEWriter overrides the default message writer. The provided function is called for each message and is responsible for writing to the response and flushing if needed. Use this to integrate with libraries like htmx-go or to add custom SSE formatting.

Example

ExampleWithSSEWriter demonstrates a custom SSE writer that formats messages with a prefix before writing them to the HTTP response.

package main

import (
	"fmt"
	"net/http"

	"github.com/catgoose/tavern"
)

func main() {
	broker := tavern.NewSSEBroker()
	defer broker.Close()

	customWriter := tavern.WithSSEWriter(func(w http.ResponseWriter, msg string) error {
		_, err := fmt.Fprintf(w, "data: %s\n\n", msg)
		return err
	})

	handler := broker.SSEHandler("events", customWriter)
	mux := http.NewServeMux()
	mux.Handle("/sse", handler)

	fmt.Println("custom writer handler registered")
}
Output:
custom writer handler registered

type SSEMessage

type SSEMessage struct {
	// Event is the SSE event type (the "event:" field).
	Event string
	// Data is the event payload (the "data:" field).
	Data string
	// ID is the optional event identifier (the "id:" field). When set, the
	// browser will send it back as Last-Event-ID on reconnection.
	ID string
	// Retry is the optional reconnection time in milliseconds (the "retry:"
	// field). A zero value omits the field.
	Retry int
}

SSEMessage represents a complete Server-Sent Event message conforming to the W3C SSE specification (https://html.spec.whatwg.org/multipage/server-sent-events.html). Use NewSSEMessage to create one with the required fields, then chain SSEMessage.WithID or SSEMessage.WithRetry for optional fields.

func NewSSEMessage

func NewSSEMessage(event, data string) SSEMessage

NewSSEMessage creates an SSEMessage with the required event type and data payload. Use the builder methods SSEMessage.WithID and SSEMessage.WithRetry to set optional fields.

func (SSEMessage) String

func (m SSEMessage) String() string

String formats the SSEMessage as wire-format SSE text, terminated by a double newline as required by the specification.

func (SSEMessage) WithID

func (m SSEMessage) WithID(id string) SSEMessage

WithID returns a copy of the message with the given event ID set. The browser uses this ID for reconnection via the Last-Event-ID header.

func (SSEMessage) WithRetry

func (m SSEMessage) WithRetry(ms int) SSEMessage

WithRetry returns a copy of the message with the reconnection time set to ms milliseconds. The browser will wait this long before attempting to reconnect after a connection loss.

type SSEWriterFunc added in v0.4.34

type SSEWriterFunc func(w http.ResponseWriter, msg string) error

SSEWriterFunc writes a message to the HTTP response. It is called for each message received from the subscriber channel. The default writer calls fmt.Fprint followed by Flush. Override with WithSSEWriter to use custom formatting (e.g., htmx-go).

type ScheduledPublisher added in v0.4.24

type ScheduledPublisher struct {
	// contains filtered or unexported fields
}

ScheduledPublisher manages multiple named sections with independent update intervals. It ticks on a fast base interval (default 100ms), renders due sections into a shared buffer, and publishes one batched message per tick. It automatically skips rendering when no subscribers are connected to the topic. ScheduledPublisher is safe for concurrent use; sections can be registered while the publisher is running.

func (*ScheduledPublisher) Register added in v0.4.24

func (p *ScheduledPublisher) Register(name string, interval time.Duration, fn RenderFunc, opts ...SectionOptions)

Register adds a named section with an interval and render function. Sections are rendered in registration order.

func (*ScheduledPublisher) SetInterval added in v0.4.24

func (p *ScheduledPublisher) SetInterval(name string, interval time.Duration) bool

SetInterval changes the interval of a registered section at runtime. Returns false if the section name is not found.

func (*ScheduledPublisher) Start added in v0.4.24

func (p *ScheduledPublisher) Start(ctx context.Context)

Start begins the publish loop. It blocks until ctx is cancelled. Typically called via broker.RunPublisher(ctx, pub.Start) or go pub.Start(ctx).

type ScheduledPublisherOption added in v0.4.24

type ScheduledPublisherOption func(*ScheduledPublisher)

ScheduledPublisherOption configures the scheduled publisher.

func WithBaseTick added in v0.4.24

func WithBaseTick(d time.Duration) ScheduledPublisherOption

WithBaseTick sets the base tick interval. Default is 100ms.

type SectionOptions added in v0.4.42

type SectionOptions struct {
	// CircuitBreaker enables circuit breaker protection for the section.
	// When nil, the section renders normally without circuit breaker logic.
	CircuitBreaker *CircuitBreakerConfig
}

SectionOptions configures optional behavior for a registered section. Pass as the last argument to ScheduledPublisher.Register.

type StreamSSEOption added in v0.4.74

type StreamSSEOption func(*streamSSEConfig)

StreamSSEOption configures StreamSSE behavior.

func WithStreamHeartbeat added in v0.4.74

func WithStreamHeartbeat(interval time.Duration) StreamSSEOption

WithStreamHeartbeat enables periodic SSE comment heartbeats while streaming. If interval is greater than zero, a ": keepalive\n\n" comment is written and flushed every interval to keep intermediaries (proxies, browsers) from closing idle connections. A zero or negative interval disables heartbeats.

Tavern's broker-level keepalive (set via WithKeepalive on the broker) emits comments to all subscribers; WithStreamHeartbeat is the per-connection equivalent for handlers built directly on StreamSSE.

func WithStreamSnapshot added in v0.4.74

func WithStreamSnapshot(fn func() string) StreamSSEOption

WithStreamSnapshot registers a snapshot function that is called once, after SSE headers are written and before any channel values are streamed. The returned string is written verbatim as the initial frame. Return an empty string to skip the snapshot without disabling streaming.

Use this to deliver server-rendered initial state to new subscribers.

func WithStreamWriter added in v0.4.74

func WithStreamWriter(fn SSEWriterFunc) StreamSSEOption

WithStreamWriter overrides the default SSE frame writer used by StreamSSE. The default writer calls fmt.Fprint followed by http.Flusher.Flush for each frame. Override to integrate with libraries like htmx-go or to add custom SSE formatting.

type SubscribeMeta added in v0.4.34

type SubscribeMeta struct {
	// ID is an identifier for this subscriber (e.g., session ID, user ID).
	ID string
	// Meta is arbitrary key-value metadata.
	Meta map[string]string
}

SubscribeMeta holds optional metadata for SSEBroker.SubscribeWithMeta. The ID is used for targeted operations like SSEBroker.Disconnect and SSEBroker.AddTopic.

type SubscribeOption added in v0.4.52

type SubscribeOption func(*subscribeConfig)

SubscribeOption configures a composable subscription created via SSEBroker.SubscribeWith, SSEBroker.SubscribeMultiWith, or SSEBroker.SubscribeGlobWith.

func SubWithFilter added in v0.4.52

func SubWithFilter(fn FilterPredicate) SubscribeOption

SubWithFilter attaches a filter predicate. Only messages for which the predicate returns true are delivered.

func SubWithMeta added in v0.4.52

func SubWithMeta(m SubscribeMeta) SubscribeOption

SubWithMeta attaches subscriber metadata queryable via SSEBroker.Subscribers.

func SubWithRate added in v0.4.52

func SubWithRate(r Rate) SubscribeOption

SubWithRate enables per-subscriber rate limiting.

func SubWithScope added in v0.4.52

func SubWithScope(scope string) SubscribeOption

SubWithScope sets the subscription scope. Only messages published via SSEBroker.PublishTo with a matching scope will be delivered.

func SubWithSnapshot added in v0.4.52

func SubWithSnapshot(fn func() string) SubscribeOption

SubWithSnapshot provides a snapshot function whose result is delivered as the first message before any live publishes.

type SubscriberInfo added in v0.4.34

type SubscriberInfo struct {
	// ID is the caller-provided identifier (empty if not set).
	ID string
	// Topic is the topic this subscriber is on.
	Topic string
	// Scope is the scope key (empty for unscoped subscribers).
	Scope string
	// ConnectedAt is when the subscription was created.
	ConnectedAt time.Time
	// Meta is caller-provided key-value metadata.
	Meta map[string]string
}

SubscriberInfo describes an active subscriber. Retrieve a snapshot of all subscribers for a topic via SSEBroker.Subscribers.

type TTLOption added in v0.4.44

type TTLOption func(*ttlConfig)

TTLOption configures optional behavior for TTL-based publish methods such as SSEBroker.PublishWithTTL and SSEBroker.PublishToWithTTL.

func WithAutoRemove added in v0.4.44

func WithAutoRemove(elementID string) TTLOption

WithAutoRemove configures a TTL publish to automatically send an OOB delete fragment for the given element ID when the message expires from the replay cache. This removes the element from currently-connected clients' DOM.

Example

ExampleWithAutoRemove demonstrates using WithAutoRemove to automatically send an OOB delete fragment when a TTL message expires.

package main

import (
	"fmt"
	"time"

	"github.com/catgoose/tavern"
)

func main() {
	broker := tavern.NewSSEBroker(tavern.WithMessageTTLSweep(10 * time.Millisecond))
	defer broker.Close()

	broker.SetReplayPolicy("toasts", 10)

	ch, unsub := broker.Subscribe("toasts")
	defer unsub()

	broker.PublishWithTTL("toasts", "<div id=\"toast-1\">Notice</div>", 30*time.Millisecond,
		tavern.WithAutoRemove("toast-1"),
	)

	msg := <-ch
	fmt.Println(msg)
}
Output:
<div id="toast-1">Notice</div>

type TopicMessage added in v0.4.34

type TopicMessage struct {
	// Topic is the name of the topic the message was published to.
	Topic string
	// Data is the published message payload.
	Data string
}

TopicMessage pairs a message with the topic it was published on. It is returned by multiplexed subscription methods such as SSEBroker.SubscribeMulti and SSEBroker.SubscribeGlob.

type TopicMetrics added in v0.4.31

type TopicMetrics struct {
	// Published is the total number of messages successfully delivered to at
	// least one subscriber on this topic.
	Published int64
	// Dropped is the total number of delivery failures (subscriber buffer full)
	// on this topic.
	Dropped int64
	// PeakSubscribers is the highest number of concurrent subscribers observed
	// on this topic since the broker was created.
	PeakSubscribers int
}

TopicMetrics holds per-topic counters. All fields are cumulative since the broker was created (except PeakSubscribers which is a high-water mark).

type TopicObservability added in v0.4.46

type TopicObservability struct {
	PublishLatency      LatencyHistogram
	SubscriberLag       map[string]int // subscriberID -> buffer depth
	ConnectionDurations []time.Duration
	Throughput          float64 // msgs/sec over last window
	EvictionCount       int64
}

TopicObservability holds observability data for a single topic. All fields are populated based on the features enabled in ObservabilityConfig; disabled features produce zero values.

Directories

Path Synopsis
Package backend defines the pluggable interface for cross-process fan-out in tavern.
Package backend defines the pluggable interface for cross-process fan-out in tavern.
memory
Package memory provides an in-process Backend implementation for testing and single-instance deployments.
Package memory provides an in-process Backend implementation for testing and single-instance deployments.
Package presence provides structured presence tracking built on top of a tavern SSE broker.
Package presence provides structured presence tracking built on top of a tavern SSE broker.
Package taverntest provides test helpers for tavern-based applications.
Package taverntest provides test helpers for tavern-based applications.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL