Documentation
¶
Overview ¶
Package tavern provides a thread-safe, topic-based pub/sub broker for Server-Sent Events (SSE). It is designed for fan-out messaging where a server publishes events and multiple HTTP clients consume them via SSE streams.
All broker methods are safe for concurrent use by multiple goroutines.
Index ¶
- Constants
- Variables
- func RenderComponent(cmp Component) string
- func RenderComponentErr(cmp Component) (string, error)
- func RenderFragments(fragments ...Fragment) string
- func StreamSSE[T any](ctx context.Context, w http.ResponseWriter, ch <-chan T, encode func(T) string, ...) error
- type AdaptiveBackpressure
- type BackpressureTier
- type BrokerMetrics
- type BrokerOption
- func WithAdaptiveBackpressure(cfg AdaptiveBackpressure) BrokerOption
- func WithAdmissionControl(fn func(topic string, currentCount int) bool) BrokerOption
- func WithBackend(b backend.Backend) BrokerOption
- func WithBufferSize(size int) BrokerOption
- func WithConnectionEvents(metaTopic string) BrokerOption
- func WithDropOldest() BrokerOption
- func WithKeepalive(interval time.Duration) BrokerOption
- func WithLogger(l *slog.Logger) BrokerOption
- func WithMaxSubscribers(n int) BrokerOption
- func WithMaxSubscribersPerTopic(n int) BrokerOption
- func WithMessageTTLSweep(interval time.Duration) BrokerOption
- func WithMetrics() BrokerOption
- func WithObservability(config ObservabilityConfig) BrokerOption
- func WithReplayStore(store ReplayStore) BrokerOption
- func WithSlowSubscriberCallback(fn func(topic string)) BrokerOption
- func WithSlowSubscriberEviction(threshold int) BrokerOption
- func WithTopicTTL(ttl time.Duration) BrokerOption
- type BrokerStats
- type CircuitBreakerConfig
- type Component
- type FilterPredicate
- type Fragment
- func Append(id, html string) Fragment
- func AppendComponent(id string, cmp Component) Fragment
- func Delete(id string) Fragment
- func Prepend(id, html string) Fragment
- func PrependComponent(id string, cmp Component) Fragment
- func Replace(id, html string) Fragment
- func ReplaceComponent(id string, cmp Component) Fragment
- type GapStrategy
- type LatencyHistogram
- type MemoryReplayStore
- func (m *MemoryReplayStore) AfterID(_ context.Context, topic, lastID string, _ int) ([]ReplayEntry, bool, error)
- func (m *MemoryReplayStore) Append(_ context.Context, topic string, entry ReplayEntry) error
- func (m *MemoryReplayStore) DeleteTopic(_ context.Context, topic string) error
- func (m *MemoryReplayStore) Latest(_ context.Context, topic string, limit int) ([]ReplayEntry, error)
- func (m *MemoryReplayStore) SetMaxEntries(_ context.Context, topic string, n int) error
- type Middleware
- type MutationEvent
- type ObservabilityConfig
- type ObservabilitySnapshot
- type PublishBatch
- func (pb *PublishBatch) Discard()
- func (pb *PublishBatch) Flush()
- func (pb *PublishBatch) Publish(topic, msg string)
- func (pb *PublishBatch) PublishOOB(topic string, fragments ...Fragment)
- func (pb *PublishBatch) PublishOOBTo(topic, scope string, fragments ...Fragment)
- func (pb *PublishBatch) PublishTo(topic, scope, msg string)
- func (pb *PublishBatch) PublishWithID(topic, id, msg string)
- func (pb *PublishBatch) PublishWithTTL(topic, msg string, ttl time.Duration, opts ...TTLOption)
- type PublishFunc
- type PublisherFunc
- type Rate
- type ReconnectCallback
- type ReconnectInfo
- type RenderError
- type RenderFunc
- type ReplayEntry
- type ReplayGapCallback
- type ReplayStore
- type SSEBroker
- func (b *SSEBroker) AddTopic(subscriberID, topic string, sendControl bool) bool
- func (b *SSEBroker) AddTopicForScope(scope, topic string, sendControl bool) int
- func (b *SSEBroker) After(topic string, fn func())
- func (b *SSEBroker) Batch() *PublishBatch
- func (b *SSEBroker) ClearDedup(topic string)
- func (b *SSEBroker) ClearReplay(topic string)
- func (b *SSEBroker) Close()
- func (b *SSEBroker) DefineGroup(name string, topics []string)
- func (b *SSEBroker) Disconnect(topic, subscriberID string) bool
- func (b *SSEBroker) DynamicGroup(name string, fn func(r *http.Request) []string)
- func (b *SSEBroker) DynamicGroupHandler(name string, opts ...SSEHandlerOption) http.Handler
- func (b *SSEBroker) GroupHandler(name string, opts ...SSEHandlerOption) http.Handler
- func (b *SSEBroker) HasSubscribers(topic string) bool
- func (b *SSEBroker) Metrics() BrokerMetrics
- func (b *SSEBroker) NewScheduledPublisher(event string, opts ...ScheduledPublisherOption) *ScheduledPublisher
- func (b *SSEBroker) NotifyMutate(resource string, event MutationEvent)
- func (b *SSEBroker) Observability() *observabilityState
- func (b *SSEBroker) OnBackpressureTierChange(fn func(sub *SubscriberInfo, oldTier, newTier BackpressureTier))
- func (b *SSEBroker) OnFirstSubscriber(topic string, fn func(topic string))
- func (b *SSEBroker) OnLastUnsubscribe(topic string, fn func(topic string))
- func (b *SSEBroker) OnMutate(resource string, fn func(MutationEvent))
- func (b *SSEBroker) OnPublishDrop(fn func(topic string, droppedForCount int))
- func (b *SSEBroker) OnReconnect(topic string, fn ReconnectCallback)
- func (b *SSEBroker) OnRenderError(fn func(*RenderError))
- func (b *SSEBroker) OnReplayGap(topic string, fn ReplayGapCallback)
- func (b *SSEBroker) Publish(topic, msg string)
- func (b *SSEBroker) PublishBlocking(topic, msg string, timeout time.Duration) error
- func (b *SSEBroker) PublishBlockingTo(topic, scope, msg string, timeout time.Duration) error
- func (b *SSEBroker) PublishDebounced(topic, msg string, after time.Duration)
- func (b *SSEBroker) PublishDrops() int64
- func (b *SSEBroker) PublishIfChanged(topic, msg string) bool
- func (b *SSEBroker) PublishIfChangedOOB(topic string, fragments ...Fragment) bool
- func (b *SSEBroker) PublishIfChangedOOBTo(topic, scope string, fragments ...Fragment) bool
- func (b *SSEBroker) PublishIfChangedTo(topic, scope, msg string) bool
- func (b *SSEBroker) PublishIfChangedWithTTL(topic, msg string, ttl time.Duration, opts ...TTLOption) bool
- func (b *SSEBroker) PublishLazyIfChangedOOB(topic string, renderFn func() []Fragment) bool
- func (b *SSEBroker) PublishLazyIfChangedOOBTo(topic, scope string, renderFn func() []Fragment) bool
- func (b *SSEBroker) PublishLazyOOB(topic string, renderFn func() []Fragment)
- func (b *SSEBroker) PublishLazyOOBTo(topic, scope string, renderFn func() []Fragment)
- func (b *SSEBroker) PublishOOB(topic string, fragments ...Fragment)
- func (b *SSEBroker) PublishOOBTo(topic, scope string, fragments ...Fragment)
- func (b *SSEBroker) PublishOOBWithTTL(topic string, ttl time.Duration, fragments ...Fragment)
- func (b *SSEBroker) PublishThrottled(topic, msg string, interval time.Duration)
- func (b *SSEBroker) PublishTo(topic, scope, msg string)
- func (b *SSEBroker) PublishToWithTTL(topic, scope, msg string, ttl time.Duration, opts ...TTLOption)
- func (b *SSEBroker) PublishWithID(topic, id, msg string)
- func (b *SSEBroker) PublishWithReplay(topic, msg string)
- func (b *SSEBroker) PublishWithTTL(topic, msg string, ttl time.Duration, opts ...TTLOption)
- func (b *SSEBroker) RemoveTopic(subscriberID, topic string, sendControl bool) bool
- func (b *SSEBroker) RemoveTopicForScope(scope, topic string, sendControl bool) int
- func (b *SSEBroker) RunPublisher(ctx context.Context, fn PublisherFunc)
- func (b *SSEBroker) SSEHandler(topic string, opts ...SSEHandlerOption) http.Handler
- func (b *SSEBroker) SetBundleOnReconnect(topic string, bundle bool)
- func (b *SSEBroker) SetOrdered(topic string, enabled bool)
- func (b *SSEBroker) SetReplayGapPolicy(topic string, strategy GapStrategy, snapshotFn func() string)
- func (b *SSEBroker) SetReplayPolicy(topic string, n int)
- func (b *SSEBroker) SetRetry(topic string, d time.Duration)
- func (b *SSEBroker) SetRetryAll(d time.Duration)
- func (b *SSEBroker) SetSimplifiedRenderer(topic string, fn func(string) string)
- func (b *SSEBroker) Stats() BrokerStats
- func (b *SSEBroker) Subscribe(topic string) (msgs <-chan string, unsubscribe func())
- func (b *SSEBroker) SubscribeFromID(topic, lastEventID string) (msgs <-chan string, unsubscribe func())
- func (b *SSEBroker) SubscribeFromIDWith(topic, lastEventID string, opts ...SubscribeOption) (msgs <-chan string, unsubscribe func())
- func (b *SSEBroker) SubscribeGlob(pattern string) (msgs <-chan TopicMessage, unsubscribe func())
- func (b *SSEBroker) SubscribeGlobScoped(pattern, scope string) (msgs <-chan TopicMessage, unsubscribe func())
- func (b *SSEBroker) SubscribeGlobWith(pattern string, opts ...SubscribeOption) (msgs <-chan TopicMessage, unsubscribe func())
- func (b *SSEBroker) SubscribeMulti(topics ...string) (msgs <-chan TopicMessage, unsubscribe func())
- func (b *SSEBroker) SubscribeMultiFromID(topics []string, lastEventID string) (msgs <-chan TopicMessage, unsubscribe func())
- func (b *SSEBroker) SubscribeMultiWith(topics []string, opts ...SubscribeOption) (msgs <-chan TopicMessage, unsubscribe func())
- func (b *SSEBroker) SubscribeMultiWithMeta(meta SubscribeMeta, topics ...string) (msgs <-chan TopicMessage, unsubscribe func())
- func (b *SSEBroker) SubscribeScoped(topic, scope string) (msgs <-chan string, unsubscribe func())
- func (b *SSEBroker) SubscribeScopedWithCoalescing(topic, scope string) (msgs <-chan string, unsubscribe func())
- func (b *SSEBroker) SubscribeScopedWithFilter(topic, scope string, predicate FilterPredicate) (msgs <-chan string, unsubscribe func())
- func (b *SSEBroker) SubscribeScopedWithRate(topic, scope string, rate Rate) (msgs <-chan string, unsubscribe func())
- func (b *SSEBroker) SubscribeWith(topic string, opts ...SubscribeOption) (msgs <-chan string, unsubscribe func())
- func (b *SSEBroker) SubscribeWithCoalescing(topic string) (msgs <-chan string, unsubscribe func())
- func (b *SSEBroker) SubscribeWithFilter(topic string, predicate FilterPredicate) (msgs <-chan string, unsubscribe func())
- func (b *SSEBroker) SubscribeWithMeta(topic string, meta SubscribeMeta) (msgs <-chan string, unsubscribe func())
- func (b *SSEBroker) SubscribeWithRate(topic string, rate Rate) (msgs <-chan string, unsubscribe func())
- func (b *SSEBroker) SubscribeWithSnapshot(topic string, snapshotFn func() string) (msgs <-chan string, unsubscribe func())
- func (b *SSEBroker) SubscriberCount() int
- func (b *SSEBroker) Subscribers(topic string) []SubscriberInfo
- func (b *SSEBroker) TopicCounts() map[string]int
- func (b *SSEBroker) UnsubscribeGlob(ch <-chan TopicMessage)
- func (b *SSEBroker) Use(mw Middleware)
- func (b *SSEBroker) UseTopics(pattern string, mw Middleware)
- type SSEHandlerOption
- type SSEMessage
- type SSEWriterFunc
- type ScheduledPublisher
- type ScheduledPublisherOption
- type SectionOptions
- type StreamSSEOption
- type SubscribeMeta
- type SubscribeOption
- type SubscriberInfo
- type TTLOption
- type TopicMessage
- type TopicMetrics
- type TopicObservability
Examples ¶
- PublishBatch.Discard
- SSEBroker (LifelineFallback)
- SSEBroker (LifelineHandoff)
- SSEBroker (LifelineReplay)
- SSEBroker (Pubsub)
- SSEBroker.After
- SSEBroker.Batch
- SSEBroker.DynamicGroupHandler
- SSEBroker.GroupHandler
- SSEBroker.OnFirstSubscriber
- SSEBroker.OnMutate
- SSEBroker.PublishDebounced
- SSEBroker.PublishThrottled
- SSEBroker.PublishWithID
- SSEBroker.PublishWithTTL
- SSEBroker.SetReplayPolicy
- SSEBroker.Stats
- SSEBroker.SubscribeFromID
- SSEBroker.SubscribeGlob
- SSEBroker.SubscribeMulti
- SSEBroker.Use
- SSEBroker.UseTopics
- WithAutoRemove
- WithSSEWriter
Constants ¶
const (
// TopicActivityFeed is a conventional topic name for site-wide activity feeds.
TopicActivityFeed = "activity-feed"
)
Topic name constants are conventions for common real-time use cases. Any string works as a topic name; these are provided for consistent naming.
Variables ¶
var ErrPublishTimeout = errors.New("tavern: publish timeout")
ErrPublishTimeout is returned by SSEBroker.PublishBlocking and SSEBroker.PublishBlockingTo when at least one subscriber's channel could not accept the message within the configured timeout.
Functions ¶
func RenderComponent ¶ added in v0.4.28
RenderComponent renders a Component to a string. If rendering fails, it returns an HTML comment containing the escaped error message.
func RenderComponentErr ¶ added in v0.4.42
RenderComponentErr renders a Component to a string, returning the error separately instead of embedding it in an HTML comment. This is useful when you want to handle render errors explicitly rather than silently embedding them in the output.
func RenderFragments ¶ added in v0.3.2
RenderFragments concatenates fragments into a single SSE-ready HTML string. Each fragment is wrapped with hx-swap-oob for HTMX OOB processing.
func StreamSSE ¶ added in v0.4.74
func StreamSSE[T any]( ctx context.Context, w http.ResponseWriter, ch <-chan T, encode func(T) string, opts ...StreamSSEOption, ) error
StreamSSE prepares an SSE response on w and streams frames from ch until ctx is cancelled or ch is closed. encode converts each channel value into the SSE frame to write; returning an empty string skips the value without terminating the stream.
StreamSSE sits between raw subscription channels and the higher-level SSEBroker.SSEHandler. It handles the mechanical parts of an SSE response — standard headers, http.Flusher verification, context cancellation, optional snapshot delivery, and optional heartbeats — while leaving the choice of subscription API, filtering, and message-to-frame conversion at the call site. It does not subscribe on your behalf.
Compose with any subscription API:
ch, unsub := broker.SubscribeScoped("orders", userID)
defer unsub()
return tavern.StreamSSE(r.Context(), w, ch, func(s string) string {
return s
})
For multiplexed channels that carry TopicMessage, the encoder typically wraps each value with the topic as the event type:
ch, unsub := broker.SubscribeMulti("orders", "invoices")
defer unsub()
return tavern.StreamSSE(r.Context(), w, ch, func(tm tavern.TopicMessage) string {
return tavern.NewSSEMessage(tm.Topic, tm.Data).String()
})
StreamSSE returns an error if w does not implement http.Flusher or if encode is nil. On normal termination — context cancellation, channel close, or write failure — it returns nil.
Types ¶
type AdaptiveBackpressure ¶ added in v0.4.46
type AdaptiveBackpressure struct {
// ThrottleAt is the consecutive drop count that triggers throttle tier.
// In throttle tier the broker delivers every 2nd message to the subscriber.
ThrottleAt int
// SimplifyAt is the consecutive drop count that triggers simplify tier.
// In simplify tier the broker attaches a fidelity hint and optionally
// applies a simplified renderer registered for the topic.
SimplifyAt int
// DisconnectAt is the consecutive drop count that triggers eviction.
DisconnectAt int
}
AdaptiveBackpressure configures tiered backpressure thresholds based on consecutive drop counts. Each threshold must be greater than the previous; a zero value disables that tier.
type BackpressureTier ¶ added in v0.4.46
type BackpressureTier int
BackpressureTier represents the current backpressure tier of a subscriber. Tiers escalate based on consecutive message drop counts.
const ( // TierNormal means messages are delivered normally (0 consecutive drops). TierNormal BackpressureTier = iota // TierThrottle means the subscriber is receiving every Nth message. TierThrottle // TierSimplify means the subscriber receives simplified/lower-fidelity content. TierSimplify // TierDisconnect means the subscriber will be evicted. TierDisconnect )
func (BackpressureTier) String ¶ added in v0.4.46
func (t BackpressureTier) String() string
String returns the tier name.
type BrokerMetrics ¶ added in v0.4.31
type BrokerMetrics struct {
// TopicStats maps topic name to its metrics.
TopicStats map[string]TopicMetrics
// TotalPublished is the sum of all per-topic published counts.
TotalPublished int64
// TotalDropped is the sum of all per-topic dropped counts.
TotalDropped int64
}
BrokerMetrics is a point-in-time snapshot of all broker metrics.
type BrokerOption ¶ added in v0.2.0
type BrokerOption func(*SSEBroker)
BrokerOption configures the SSE broker.
func WithAdaptiveBackpressure ¶ added in v0.4.46
func WithAdaptiveBackpressure(cfg AdaptiveBackpressure) BrokerOption
WithAdaptiveBackpressure enables tiered backpressure that adapts per-subscriber based on their consecutive drop count. This subsumes WithSlowSubscriberEviction: the DisconnectAt threshold acts as the eviction threshold.
Tiers from lowest to highest pressure:
- Normal (0 drops): messages delivered normally
- Throttle (≥ThrottleAt drops): delivers every 2nd message
- Simplify (≥SimplifyAt drops): applies simplified renderer if registered
- Disconnect (≥DisconnectAt drops): evicts the subscriber
The drop counter resets on any successful send, returning the subscriber to the normal tier automatically.
func WithAdmissionControl ¶ added in v0.4.49
func WithAdmissionControl(fn func(topic string, currentCount int) bool) BrokerOption
WithAdmissionControl sets a custom admission function that is called for every new subscription attempt. The function receives the topic name and the current total subscriber count for that topic. It should return true to allow the subscription or false to deny it.
func WithBackend ¶ added in v0.4.46
func WithBackend(b backend.Backend) BrokerOption
WithBackend configures the broker to use a cross-process fan-out backend. When set, every Publish also forwards the message to the backend, and the broker automatically subscribes to the backend when the first local subscriber joins a topic and unsubscribes when the last local subscriber leaves.
Messages arriving from the backend are dispatched directly to local subscriber channels — they skip middleware and After hooks to avoid duplicate side-effects across instances.
func WithBufferSize ¶ added in v0.2.0
func WithBufferSize(size int) BrokerOption
WithBufferSize sets the subscriber channel buffer size. Default is 10.
func WithConnectionEvents ¶ added in v0.4.34
func WithConnectionEvents(metaTopic string) BrokerOption
WithConnectionEvents enables publishing subscriber connect/disconnect events to the given meta topic. Events are JSON-formatted messages containing the event type, topic, and current subscriber count. The meta topic itself does not generate recursive events.
func WithDropOldest ¶ added in v0.4.24
func WithDropOldest() BrokerOption
WithDropOldest changes the subscriber buffer strategy from drop-newest (default) to drop-oldest. When a subscriber's buffer is full, the oldest buffered message is discarded to make room for the new one. This is useful for dashboards where the latest data is always more relevant than queued historical data.
func WithKeepalive ¶ added in v0.4.20
func WithKeepalive(interval time.Duration) BrokerOption
WithKeepalive enables periodic SSE comment keepalives sent to all subscribers at the given interval. This keeps connections alive through proxies and load balancers that close idle connections. A zero or negative interval disables keepalives (the default).
func WithLogger ¶ added in v0.4.18
func WithLogger(l *slog.Logger) BrokerOption
WithLogger sets a structured logger for the broker. When set, publisher panics and errors are logged. Default is nil (no logging).
func WithMaxSubscribers ¶ added in v0.4.49
func WithMaxSubscribers(n int) BrokerOption
WithMaxSubscribers sets a global limit on the total number of concurrent subscribers across all topics. Default is 0 (unlimited). When the limit is reached, new Subscribe calls return a nil channel and nil unsubscribe function, and SSEHandler returns HTTP 503 Service Unavailable.
func WithMaxSubscribersPerTopic ¶ added in v0.4.49
func WithMaxSubscribersPerTopic(n int) BrokerOption
WithMaxSubscribersPerTopic sets a per-topic limit on the number of concurrent subscribers. Default is 0 (unlimited). When the limit for a topic is reached, new Subscribe calls for that topic return a nil channel and nil unsubscribe function, and SSEHandler returns HTTP 503.
func WithMessageTTLSweep ¶ added in v0.4.44
func WithMessageTTLSweep(interval time.Duration) BrokerOption
WithMessageTTLSweep sets the interval at which the background goroutine checks for expired TTL entries in the replay cache. Default is 1 second. A shorter interval provides faster expiry at the cost of more frequent lock acquisitions. This option only takes effect when TTL publishes are used.
func WithMetrics ¶ added in v0.4.31
func WithMetrics() BrokerOption
WithMetrics enables per-topic publish and drop counters. When disabled (the default), metrics tracking has zero overhead. Use SSEBroker.Metrics to retrieve a snapshot.
func WithObservability ¶ added in v0.4.46
func WithObservability(config ObservabilityConfig) BrokerOption
WithObservability enables enhanced observability with the given configuration. Disabled by default — zero overhead when not configured.
func WithReplayStore ¶ added in v0.4.63
func WithReplayStore(store ReplayStore) BrokerOption
WithReplayStore sets an external ReplayStore for persisting replay entries. When configured, the broker delegates all replay read/write operations to the store instead of using its internal in-memory maps. This enables durable replay across process restarts or shared replay across multiple broker instances.
When no store is configured (the default), the broker uses its built-in in-memory replay, preserving existing behavior.
func WithSlowSubscriberCallback ¶ added in v0.4.34
func WithSlowSubscriberCallback(fn func(topic string)) BrokerOption
WithSlowSubscriberCallback sets a function that is called when a subscriber is evicted due to slow consumption. The callback receives the topic name and runs in its own goroutine.
func WithSlowSubscriberEviction ¶ added in v0.4.34
func WithSlowSubscriberEviction(threshold int) BrokerOption
WithSlowSubscriberEviction enables automatic disconnection of subscribers that have dropped more than threshold consecutive messages. When a subscriber is evicted, its channel is closed, triggering the client's EventSource to reconnect. The counter resets when a message is successfully delivered. A threshold of 0 disables eviction (the default).
func WithTopicTTL ¶ added in v0.4.21
func WithTopicTTL(ttl time.Duration) BrokerOption
WithTopicTTL sets how long a topic with zero subscribers may remain in the broker before it is automatically removed. A background goroutine sweeps at half the TTL interval. A zero or negative TTL disables auto-cleanup.
type BrokerStats ¶ added in v0.3.0
type BrokerStats struct {
// Topics is the number of active topics.
Topics int
// Subscribers is the total number of active subscribers across all topics.
Subscribers int
// PublishDrops is the cumulative number of dropped messages.
PublishDrops int64
}
BrokerStats is a point-in-time summary of broker state returned by SSEBroker.Stats.
type CircuitBreakerConfig ¶ added in v0.4.42
type CircuitBreakerConfig struct {
// FailureThreshold is the number of consecutive failures before the
// circuit opens. Must be at least 1.
FailureThreshold int
// RecoveryInterval is how long the circuit stays open before trying a
// half-open probe. Must be positive.
RecoveryInterval time.Duration
// FallbackRender is called when the circuit is open. If nil, the section
// is skipped while the circuit is open.
FallbackRender func() string
}
CircuitBreakerConfig configures a circuit breaker for a scheduled section. The circuit breaker follows the standard closed/open/half-open state machine.
type Component ¶ added in v0.4.11
Component renders itself to a writer. This interface is intentionally identical to templ.Component so that templ components can be passed directly without tavern importing the templ package.
type FilterPredicate ¶ added in v0.4.46
FilterPredicate is a function that returns true if the message should be delivered to the subscriber. It runs synchronously in the publish goroutine for every message on the topic, so implementations must be fast and non-blocking. Messages rejected by the predicate are silently skipped and do not count toward drop counts or backpressure tiers.
type Fragment ¶ added in v0.3.2
type Fragment struct {
// ID is the target DOM element ID.
ID string
// Swap is the hx-swap-oob value: "outerHTML", "innerHTML", "delete",
// "beforeend", or "afterbegin".
Swap string
// HTML is the inner HTML content. Empty for delete operations.
HTML string
}
Fragment describes a targeted DOM mutation for HTMX OOB swaps via SSE. Use the convenience constructors Replace, Append, Prepend, and Delete instead of building Fragment values directly.
func Append ¶ added in v0.3.2
Append creates a fragment that appends content to the end of an element.
func AppendComponent ¶ added in v0.4.11
AppendComponent renders a Component and returns an Append fragment. If rendering fails, the fragment contains the error message as an HTML comment.
func Prepend ¶ added in v0.3.2
Prepend creates a fragment that prepends content to the beginning of an element.
func PrependComponent ¶ added in v0.4.11
PrependComponent renders a Component and returns a Prepend fragment. If rendering fails, the fragment contains the error message as an HTML comment.
func ReplaceComponent ¶ added in v0.4.11
ReplaceComponent renders a Component and returns a Replace fragment. If rendering fails, the fragment contains the error message as an HTML comment.
type GapStrategy ¶ added in v0.4.39
type GapStrategy int
GapStrategy determines how the broker responds when a subscriber reconnects with a Last-Event-ID that is no longer in the replay log (i.e., the log has rolled over and the requested ID is gone). Configure per-topic via SSEBroker.SetReplayGapPolicy.
const ( // GapSilent is the default strategy. When a gap is detected, no replay // occurs and the subscriber receives only live messages going forward. // This preserves backwards compatibility with the existing behaviour. GapSilent GapStrategy = iota // GapFallbackToSnapshot uses the configured SnapshotFunc to generate a // full-state snapshot and delivers it to the subscriber before live // messages begin. This ensures the client can rebuild its state even // when the replay log has rolled over. GapFallbackToSnapshot )
type LatencyHistogram ¶ added in v0.4.46
LatencyHistogram holds percentile latency data computed from a circular buffer of the most recent 1024 samples.
type MemoryReplayStore ¶ added in v0.4.63
type MemoryReplayStore struct {
// contains filtered or unexported fields
}
MemoryReplayStore is an in-memory implementation of ReplayStore. It stores entries in ordered slices per topic with a configurable maximum size. All methods are safe for concurrent use.
func NewMemoryReplayStore ¶ added in v0.4.63
func NewMemoryReplayStore() *MemoryReplayStore
NewMemoryReplayStore creates a ready-to-use in-memory replay store.
func (*MemoryReplayStore) AfterID ¶ added in v0.4.63
func (m *MemoryReplayStore) AfterID(_ context.Context, topic, lastID string, _ int) ([]ReplayEntry, bool, error)
AfterID returns entries published after the given ID. If lastID is not found in the store, found is false and entries is nil (indicating a gap). Expired entries are filtered out.
func (*MemoryReplayStore) Append ¶ added in v0.4.63
func (m *MemoryReplayStore) Append(_ context.Context, topic string, entry ReplayEntry) error
Append stores a replay entry for a topic. If the number of entries exceeds the configured maximum for the topic, the oldest entries are discarded. The default maximum is 1 if not explicitly configured via MemoryReplayStore.SetMaxEntries.
func (*MemoryReplayStore) DeleteTopic ¶ added in v0.4.63
func (m *MemoryReplayStore) DeleteTopic(_ context.Context, topic string) error
DeleteTopic removes all replay entries for a topic and its max-size config.
func (*MemoryReplayStore) Latest ¶ added in v0.4.63
func (m *MemoryReplayStore) Latest(_ context.Context, topic string, limit int) ([]ReplayEntry, error)
Latest returns the most recent entries for a topic, up to limit. Expired entries are filtered out.
func (*MemoryReplayStore) SetMaxEntries ¶ added in v0.4.63
SetMaxEntries configures the maximum number of entries to retain for a topic. If n <= 0, all entries and the limit are removed (equivalent to DeleteTopic). If the current number of entries exceeds n, the oldest are discarded.
type Middleware ¶ added in v0.4.41
type Middleware func(next PublishFunc) PublishFunc
Middleware wraps a PublishFunc to add cross-cutting behaviour to the publish pipeline. Middleware is called in registration order (first registered = outermost) and may transform the message, add side-effects, or swallow the publish entirely by not calling next.
type MutationEvent ¶ added in v0.4.43
type MutationEvent struct {
// ID identifies the specific entity that was mutated (e.g., an order ID).
ID string
// Data holds the mutated entity or any additional context the handler needs.
Data any
}
MutationEvent carries context about a resource mutation. It is passed to handlers registered via SSEBroker.OnMutate when SSEBroker.NotifyMutate is called. Resources are logical entities (e.g., "orders") decoupled from topic names.
type ObservabilityConfig ¶ added in v0.4.46
type ObservabilityConfig struct {
// PublishLatency enables per-topic publish latency histograms.
PublishLatency bool
// SubscriberLag enables per-subscriber buffer depth gauges.
SubscriberLag bool
// ConnectionDuration enables tracking of subscriber connection durations.
ConnectionDuration bool
// TopicThroughput enables per-topic message rate calculation.
TopicThroughput bool
}
ObservabilityConfig controls which observability features are enabled. By default all fields are false and observability has zero overhead. Enable individual features selectively to minimize runtime cost.
type ObservabilitySnapshot ¶ added in v0.4.46
type ObservabilitySnapshot struct {
Topics map[string]TopicObservability
}
ObservabilitySnapshot is a point-in-time, export-friendly snapshot of all observability data across all topics. Obtain one via [observabilityState.Snapshot].
type PublishBatch ¶ added in v0.4.39
type PublishBatch struct {
// contains filtered or unexported fields
}
PublishBatch buffers publish operations and flushes them as a single concatenated write per subscriber channel, reducing the number of SSE writes on the wire. Create one via SSEBroker.Batch. Multiple goroutines may call Publish/PublishTo concurrently, but Flush and Discard must be called at most once and not concurrently with publishes.
func (*PublishBatch) Discard ¶ added in v0.4.39
func (pb *PublishBatch) Discard()
Discard clears all buffered operations without sending anything.
Example ¶
ExamplePublishBatch_Discard demonstrates discarding a batch without sending any messages.
package main
import (
"fmt"
"github.com/catgoose/tavern"
)
func main() {
broker := tavern.NewSSEBroker()
defer broker.Close()
ch, unsub := broker.Subscribe("updates")
defer unsub()
batch := broker.Batch()
batch.Publish("updates", "draft message")
batch.Discard()
// Verify nothing was delivered by publishing a sentinel.
broker.Publish("updates", "after-discard")
msg := <-ch
fmt.Println(msg)
}
Output: after-discard
func (*PublishBatch) Flush ¶ added in v0.4.39
func (pb *PublishBatch) Flush()
Flush sends all buffered messages. For each unique (topic, scope) combination the individual messages are concatenated, then routed through the same publish pipeline as SSEBroker.Publish / SSEBroker.PublishTo — middleware, rate limiting, filters, adaptive backpressure, glob subscribers, backend publish, observability, and after-hooks all execute exactly as they would for a regular publish. The concatenation preserves the batch's value proposition: each subscriber receives a single channel write containing all messages for that topic.
Flush should be called at most once; the batch is empty afterwards.
func (*PublishBatch) Publish ¶ added in v0.4.39
func (pb *PublishBatch) Publish(topic, msg string)
Publish buffers a message for all subscribers of the given topic.
func (*PublishBatch) PublishOOB ¶ added in v0.4.39
func (pb *PublishBatch) PublishOOB(topic string, fragments ...Fragment)
PublishOOB buffers OOB fragments for all subscribers of the given topic.
func (*PublishBatch) PublishOOBTo ¶ added in v0.4.39
func (pb *PublishBatch) PublishOOBTo(topic, scope string, fragments ...Fragment)
PublishOOBTo buffers OOB fragments for scoped subscribers matching the scope.
func (*PublishBatch) PublishTo ¶ added in v0.4.39
func (pb *PublishBatch) PublishTo(topic, scope, msg string)
PublishTo buffers a scoped message for subscribers matching the scope.
func (*PublishBatch) PublishWithID ¶ added in v0.4.52
func (pb *PublishBatch) PublishWithID(topic, id, msg string)
PublishWithID publishes a message with an associated event ID for Last-Event-ID resumption. Like PublishBatch.PublishWithTTL, this executes immediately rather than buffering.
func (*PublishBatch) PublishWithTTL ¶ added in v0.4.52
func (pb *PublishBatch) PublishWithTTL(topic, msg string, ttl time.Duration, opts ...TTLOption)
PublishWithTTL publishes a message with a TTL on the replay cache entry. Unlike other PublishBatch methods, this executes immediately rather than buffering because the TTL sweeper requires immediate processing.
type PublishFunc ¶ added in v0.4.41
type PublishFunc func(topic, msg string)
PublishFunc is the function signature for publish operations. Middleware wraps this to intercept, transform, or swallow publishes.
type PublisherFunc ¶ added in v0.4.18
PublisherFunc is a long-running function that publishes messages to the broker. It receives the broker's context and should return when the context is cancelled.
type Rate ¶ added in v0.4.45
type Rate struct {
// MaxPerSecond is a convenience field: converted to MinInterval internally.
MaxPerSecond float64
// MinInterval is the minimum time between deliveries to this subscriber.
MinInterval time.Duration
}
Rate configures per-subscriber rate limiting. When a subscriber is rate-limited, messages published faster than the configured rate are held and only the most recent held message is delivered when the interval elapses (latest-wins). If both MaxPerSecond and MinInterval are set, MinInterval takes precedence.
type ReconnectCallback ¶ added in v0.4.46
type ReconnectCallback func(info ReconnectInfo)
ReconnectCallback is invoked when a subscriber reconnects with a Last-Event-ID header. It fires on ALL reconnections regardless of whether the replay log can satisfy the request.
type ReconnectInfo ¶ added in v0.4.46
type ReconnectInfo struct {
// Topic is the topic the subscriber reconnected to.
Topic string
// SubscriberID is the caller-provided identifier (empty if not set via metadata).
SubscriberID string
// LastEventID is the Last-Event-ID sent by the client.
LastEventID string
// Gap is the time elapsed since the LastEventID was published. Zero if the
// ID was not found in the replay log (e.g., it has rolled out).
Gap time.Duration
// MissedCount is the number of messages published after LastEventID that
// the subscriber missed. Zero if the ID was not found in the replay log.
MissedCount int
// ReplayDelivered is the number of replay messages successfully enqueued
// to the subscriber channel. This may be less than MissedCount if the
// subscriber buffer was too small to hold all replay messages.
ReplayDelivered int
// ReplayDropped is the number of replay messages that could not be
// enqueued because the subscriber buffer was full.
ReplayDropped int
// SendToSubscriber sends a message directly to this subscriber's channel.
// The message is delivered as-is (raw SSE text). If the subscriber's buffer
// is full the message is dropped silently.
SendToSubscriber func(msg string)
}
ReconnectInfo provides context about a subscriber reconnection, including the gap duration and number of missed messages. It is passed to callbacks registered with SSEBroker.OnReconnect.
type RenderError ¶ added in v0.4.42
type RenderError struct {
// Topic is the broker topic or scheduled publisher event associated with the error.
Topic string
// Section is the section name within a ScheduledPublisher (empty for broker-level errors).
Section string
// Err is the underlying error returned by the render function.
Err error
// Timestamp is when the error occurred.
Timestamp time.Time
// Count is the current consecutive failure count.
Count int
}
RenderError contains structured information about a render failure. It implements the error interface and supports errors.Unwrap for the underlying error.
func (*RenderError) Error ¶ added in v0.4.42
func (e *RenderError) Error() string
Error implements the error interface.
func (*RenderError) Unwrap ¶ added in v0.4.42
func (e *RenderError) Unwrap() error
Unwrap returns the underlying error.
type RenderFunc ¶ added in v0.4.24
RenderFunc renders content into the provided buffer. It receives the context (which is cancelled when the scheduled publisher stops) and a shared buffer to write HTML into. Multiple sections write to the same buffer in a single tick, so output should be self-contained fragments.
type ReplayEntry ¶ added in v0.4.25
type ReplayEntry struct {
// ID is the SSE event identifier used for Last-Event-ID resumption.
ID string
// Msg is the raw message payload stored in the replay log.
Msg string
// ExpiresAt is when this entry should be purged from the replay cache.
// A zero value means the entry does not expire.
ExpiresAt time.Time
// AutoRemoveID is the DOM element ID to send an OOB delete fragment for
// when this entry expires. Empty means no auto-removal.
AutoRemoveID string
// PublishedAt records when the entry was originally published, used to
// compute reconnection gap durations.
PublishedAt time.Time
}
ReplayEntry pairs a message with its event ID for Last-Event-ID resumption support. When ExpiresAt is non-zero, the entry will be removed from the replay cache after that time (see SSEBroker.PublishWithTTL).
type ReplayGapCallback ¶ added in v0.4.39
type ReplayGapCallback func(sub *SubscriberInfo, lastEventID string)
ReplayGapCallback is invoked when a replay gap is detected for a subscriber. It receives the subscriber's info and the Last-Event-ID that could not be found in the replay log.
type ReplayStore ¶ added in v0.4.63
type ReplayStore interface {
// Append stores a replay entry for a topic.
Append(ctx context.Context, topic string, entry ReplayEntry) error
// AfterID returns entries published after the given ID.
// found indicates whether lastID exists in the store.
// If found=false, the broker treats this as a gap.
// Must not return expired entries (TTL filtering at read time).
AfterID(ctx context.Context, topic, lastID string, limit int) (entries []ReplayEntry, found bool, err error)
// Latest returns the most recent entries for a topic (for initial
// subscribe replay). Must not return expired entries.
Latest(ctx context.Context, topic string, limit int) ([]ReplayEntry, error)
// DeleteTopic removes all replay entries for a topic.
DeleteTopic(ctx context.Context, topic string) error
// SetMaxEntries configures the maximum number of entries to retain for a
// topic. When the limit is exceeded, the oldest entries are discarded.
// A value of 0 or less removes all entries and the limit for the topic.
SetMaxEntries(ctx context.Context, topic string, n int) error
}
ReplayStore is an abstraction for storing and retrieving replay entries. Implementations must be safe for concurrent use by multiple goroutines. The default in-memory implementation is MemoryReplayStore.
IDs are topic-scoped (not global). TTL filtering happens at read time: stores must not return expired entries from [ReplayStore.AfterID] or [ReplayStore.Latest].
type SSEBroker ¶
type SSEBroker struct {
// contains filtered or unexported fields
}
SSEBroker is a thread-safe, topic-based pub/sub message broker. Subscribers receive messages on a buffered channel and publishers fan out messages to all subscribers of a given topic. A zero-value SSEBroker is not usable; create one with NewSSEBroker.
Example (LifelineFallback) ¶
package main
import (
"fmt"
"github.com/catgoose/tavern"
)
func main() {
broker := tavern.NewSSEBroker()
defer broker.Close()
// Lifeline carries control-plane and fallback signals.
ch, unsub := broker.SubscribeMultiWithMeta(tavern.SubscribeMeta{ID: "app"}, "control", "fallback")
defer unsub()
// Scoped panel stream -- active while viewing the panel.
_, panelUnsub := broker.SubscribeScoped("panel-data", "user:1")
// User navigates away -- panel stream torn down.
panelUnsub()
// Lifeline still delivers fallback/invalidation signals.
broker.Publish("fallback", "data-stale")
msg := <-ch
fmt.Printf("topic=%s data=%s\n", msg.Topic, msg.Data)
}
Output: topic=fallback data=data-stale
Example (LifelineHandoff) ¶
package main
import (
"fmt"
"github.com/catgoose/tavern"
)
func main() {
broker := tavern.NewSSEBroker()
defer broker.Close()
// Lifeline: one persistent connection for app-shell events.
ch, unsub := broker.SubscribeMultiWithMeta(tavern.SubscribeMeta{ID: "app"}, "control")
defer unsub()
// User navigates to dashboard -- add topic dynamically.
broker.AddTopic("app", "dashboard", true)
// Control event confirms the topic was added.
ctrl := <-ch
fmt.Println("control event topic:", ctrl.Topic)
// Both topics now deliver through the single lifeline channel.
broker.Publish("dashboard", "chart-update")
msg := <-ch
fmt.Printf("topic=%s data=%s\n", msg.Topic, msg.Data)
// User navigates away -- remove dashboard topic.
broker.RemoveTopic("app", "dashboard", true)
// Drain the removal control event.
<-ch
}
Output: control event topic: tavern-topics-changed topic=dashboard data=chart-update
Example (LifelineReplay) ¶
package main
import (
"fmt"
"strings"
"github.com/catgoose/tavern"
)
func main() {
broker := tavern.NewSSEBroker()
defer broker.Close()
broker.SetReplayPolicy("panel", 10)
// Lifeline stays connected throughout.
lifeline, lifelineUnsub := broker.SubscribeMultiWithMeta(tavern.SubscribeMeta{ID: "app"}, "control")
defer lifelineUnsub()
// Publish panel events with IDs while panel stream is down.
broker.PublishWithID("panel", "e1", "update-1")
broker.PublishWithID("panel", "e2", "update-2")
// Panel stream reconnects with last known ID -- replay fills the gap.
panelCh, panelUnsub := broker.SubscribeFromID("panel", "e1")
defer panelUnsub()
// Skip the reconnected control event.
<-panelCh
// Replayed message arrives.
replayed := <-panelCh
fmt.Println("replayed:", extractData(replayed))
// Lifeline was never interrupted.
broker.Publish("control", "still-alive")
msg := <-lifeline
fmt.Printf("lifeline: topic=%s data=%s\n", msg.Topic, msg.Data)
}
// extractData is a test helper that extracts the raw data from a message
// that may contain injected SSE id: fields.
func extractData(msg string) string {
var parts []string
for _, line := range strings.Split(msg, "\n") {
if !strings.HasPrefix(line, "id: ") {
parts = append(parts, line)
}
}
result := strings.Join(parts, "\n")
return strings.TrimSpace(result)
}
Output: replayed: update-2 lifeline: topic=control data=still-alive
Example (Pubsub) ¶
package main
import (
"fmt"
"github.com/catgoose/tavern"
)
// Topic name conventions for dashboard and real-time UI applications. Your
// application may use any string as a topic name; these are provided as
// examples of consistent naming patterns.
const TopicSystemStats = "system-stats"
func main() {
broker := tavern.NewSSEBroker()
defer broker.Close()
ch, unsub := broker.Subscribe(TopicSystemStats)
defer unsub()
broker.Publish(TopicSystemStats, `{"cpu": 42}`)
msg := <-ch
fmt.Println(msg)
}
Output: {"cpu": 42}
func NewSSEBroker ¶
func NewSSEBroker(opts ...BrokerOption) *SSEBroker
NewSSEBroker creates a ready-to-use SSEBroker with no active topics or subscribers. It accepts optional BrokerOption values to override defaults.
func (*SSEBroker) AddTopic ¶ added in v0.4.46
AddTopic adds a topic to an existing subscriber identified by subscriberID. The subscriber starts receiving messages from the new topic without reconnecting. If the subscriber is already subscribed to the topic, this is a no-op and returns false. Returns true if the topic was successfully added.
If sendControl is true, a control event with type "tavern-topics-changed" is sent on the subscriber's channel so the client can react (e.g., set up new SSE-swap targets).
func (*SSEBroker) AddTopicForScope ¶ added in v0.4.46
AddTopicForScope adds a topic to all subscribers with the matching scope. Returns the number of subscribers that had the topic added.
func (*SSEBroker) After ¶ added in v0.4.43
After registers a callback that fires asynchronously after a successful publish to the named topic. Multiple After hooks per topic are allowed and execute in registration order. Hooks run in a new goroutine and do not block the publish path.
After hooks that publish to other topics may trigger further After hooks. To prevent infinite cycles, the broker enforces a maximum nesting depth of 8 and will skip hooks that would re-enter a topic already in the current chain.
Calling After on a closed broker is a no-op.
Example ¶
ExampleSSEBroker_After demonstrates After hooks that fire asynchronously after a successful publish. This is useful for triggering side effects like cache invalidation or dependent topic updates.
package main
import (
"fmt"
"sync"
"github.com/catgoose/tavern"
)
func main() {
broker := tavern.NewSSEBroker()
defer broker.Close()
var done sync.WaitGroup
done.Add(1)
ch, unsub := broker.Subscribe("audit-log")
defer unsub()
broker.After("orders", func() {
broker.Publish("audit-log", "order topic updated")
done.Done()
})
broker.Publish("orders", `{"id": 1}`)
done.Wait()
msg := <-ch
fmt.Println(msg)
}
Output: order topic updated
func (*SSEBroker) Batch ¶ added in v0.4.39
func (b *SSEBroker) Batch() *PublishBatch
Batch creates a new PublishBatch that buffers publish calls against this broker. Call PublishBatch.Flush to send all buffered messages or PublishBatch.Discard to throw them away.
Example ¶
ExampleSSEBroker_Batch demonstrates atomic multi-message publishing. All messages in a batch are concatenated per topic and delivered as a single channel write, reducing SSE frame overhead.
package main
import (
"fmt"
"github.com/catgoose/tavern"
)
func main() {
broker := tavern.NewSSEBroker()
defer broker.Close()
ch, unsub := broker.Subscribe("updates")
defer unsub()
batch := broker.Batch()
batch.Publish("updates", "line1\n")
batch.Publish("updates", "line2\n")
batch.Flush()
// The subscriber receives all batched messages as one concatenated write.
msg := <-ch
fmt.Print(msg)
}
Output: line1 line2
func (*SSEBroker) ClearDedup ¶ added in v0.4.22
ClearDedup resets the deduplication state for the given topic so the next call to SSEBroker.PublishIfChanged will always publish regardless of content.
func (*SSEBroker) ClearReplay ¶ added in v0.4.19
ClearReplay removes the cached replay message for the given topic. Future subscribers will no longer receive a replayed message on connect.
func (*SSEBroker) Close ¶
func (b *SSEBroker) Close()
Close shuts down the broker. It first waits for all publisher goroutines started via SSEBroker.RunPublisher to return, then closes all subscriber channels and removes all topics. After Close returns, any pending reads on subscriber channels will receive the zero value. It is safe to call Close while other goroutines are publishing or subscribing; however, no new messages will be delivered after Close returns.
func (*SSEBroker) DefineGroup ¶ added in v0.4.40
DefineGroup registers a named static topic group. The group can later be served via SSEBroker.GroupHandler. Defining a group with the same name again replaces the previous definition.
func (*SSEBroker) Disconnect ¶ added in v0.4.34
Disconnect closes the subscriber channel with the given ID on the given topic. Returns true if the subscriber was found and disconnected.
func (*SSEBroker) DynamicGroup ¶ added in v0.4.40
DynamicGroup registers a named dynamic topic group. The provided function is evaluated per-request to determine which topics a given connection should subscribe to. This enables per-request authorization — different users can receive different topic sets from the same endpoint.
func (*SSEBroker) DynamicGroupHandler ¶ added in v0.4.40
func (b *SSEBroker) DynamicGroupHandler(name string, opts ...SSEHandlerOption) http.Handler
DynamicGroupHandler returns an http.Handler that streams SSE messages for topics determined per-request by the dynamic group's function. Each message is formatted with the topic as the SSE event type.
DynamicGroupHandler accepts the same SSEHandlerOption values as SSEBroker.SSEHandler. If the group name has not been defined, the handler responds with 404.
Example ¶
ExampleSSEBroker_DynamicGroupHandler demonstrates a dynamic topic group where the topics are resolved per-request. This enables per-user authorization so different users receive different topic sets from the same endpoint.
package main
import (
"fmt"
"net/http"
"github.com/catgoose/tavern"
)
func main() {
broker := tavern.NewSSEBroker()
defer broker.Close()
broker.DynamicGroup("user-feed", func(r *http.Request) []string {
userID := r.URL.Query().Get("user")
return []string{"global-feed", "user/" + userID + "/notifications"}
})
handler := broker.DynamicGroupHandler("user-feed")
mux := http.NewServeMux()
mux.Handle("/sse/feed", handler)
fmt.Println("dynamic handler registered")
}
Output: dynamic handler registered
func (*SSEBroker) GroupHandler ¶ added in v0.4.40
func (b *SSEBroker) GroupHandler(name string, opts ...SSEHandlerOption) http.Handler
GroupHandler returns an http.Handler that streams SSE messages for all topics in the named static group. Each message is formatted with the topic as the SSE event type and the published data as the data field.
GroupHandler accepts the same SSEHandlerOption values as SSEBroker.SSEHandler. If the group name has not been defined, the handler responds with 404.
Example ¶
ExampleSSEBroker_GroupHandler demonstrates a static topic group that multiplexes several topics onto a single SSE connection. This is useful when a dashboard page needs data from multiple topics without opening separate EventSource connections.
package main
import (
"fmt"
"net/http"
"github.com/catgoose/tavern"
)
func main() {
broker := tavern.NewSSEBroker()
defer broker.Close()
broker.DefineGroup("dashboard", []string{"metrics", "alerts", "status"})
handler := broker.GroupHandler("dashboard")
mux := http.NewServeMux()
mux.Handle("/sse/dashboard", handler)
fmt.Println("handler registered")
}
Output: handler registered
func (*SSEBroker) HasSubscribers ¶
HasSubscribers reports whether the given topic has at least one active subscriber, including both unscoped and scoped subscribers. This is useful for skipping expensive serialization when no clients are listening.
func (*SSEBroker) Metrics ¶ added in v0.4.31
func (b *SSEBroker) Metrics() BrokerMetrics
Metrics returns a point-in-time snapshot of per-topic and aggregate metrics. Returns an empty BrokerMetrics if metrics are not enabled (see WithMetrics).
func (*SSEBroker) NewScheduledPublisher ¶ added in v0.4.24
func (b *SSEBroker) NewScheduledPublisher(event string, opts ...ScheduledPublisherOption) *ScheduledPublisher
NewScheduledPublisher creates a publisher that publishes to the given event/topic on the broker.
func (*SSEBroker) NotifyMutate ¶ added in v0.4.43
func (b *SSEBroker) NotifyMutate(resource string, event MutationEvent)
NotifyMutate triggers all handlers registered via SSEBroker.OnMutate for the named resource. Handlers execute synchronously in registration order in the caller's goroutine. If no handlers are registered for the resource, this is a no-op.
func (*SSEBroker) Observability ¶ added in v0.4.46
func (b *SSEBroker) Observability() *observabilityState
Observability returns the observability handle for the broker. Returns nil if observability is not enabled.
func (*SSEBroker) OnBackpressureTierChange ¶ added in v0.4.46
func (b *SSEBroker) OnBackpressureTierChange(fn func(sub *SubscriberInfo, oldTier, newTier BackpressureTier))
OnBackpressureTierChange sets a callback that fires whenever a subscriber transitions between backpressure tiers. The callback receives the subscriber info and the old and new tiers. The callback runs in its own goroutine.
func (*SSEBroker) OnFirstSubscriber ¶ added in v0.4.23
OnFirstSubscriber registers a callback that fires when the given topic goes from zero to one total subscribers (counting both unscoped and scoped). The callback runs in its own goroutine and does not block Subscribe. Multiple hooks per topic are allowed and all will fire. Hooks persist across subscriber cycles. Calling this on a closed broker is a no-op.
Example ¶
ExampleSSEBroker_OnFirstSubscriber demonstrates lifecycle hooks that fire when the first subscriber joins a topic and when the last one leaves.
package main
import (
"fmt"
"sync"
"github.com/catgoose/tavern"
)
func main() {
broker := tavern.NewSSEBroker()
defer broker.Close()
var firstDone, lastDone sync.WaitGroup
firstDone.Add(1)
lastDone.Add(1)
broker.OnFirstSubscriber("prices", func(topic string) {
fmt.Printf("first subscriber on %s\n", topic)
firstDone.Done()
})
broker.OnLastUnsubscribe("prices", func(topic string) {
fmt.Printf("last subscriber left %s\n", topic)
lastDone.Done()
})
_, unsub := broker.Subscribe("prices")
firstDone.Wait()
unsub()
lastDone.Wait()
}
Output: first subscriber on prices last subscriber left prices
func (*SSEBroker) OnLastUnsubscribe ¶ added in v0.4.23
OnLastUnsubscribe registers a callback that fires when the given topic goes from one to zero total subscribers (counting both unscoped and scoped). The callback runs in its own goroutine and does not block the unsubscribe call. Multiple hooks per topic are allowed and all will fire. Hooks persist across subscriber cycles. Calling this on a closed broker is a no-op.
func (*SSEBroker) OnMutate ¶ added in v0.4.43
func (b *SSEBroker) OnMutate(resource string, fn func(MutationEvent))
OnMutate registers a handler for the named resource. The handler fires when SSEBroker.NotifyMutate is called with the same resource name. Multiple handlers per resource are allowed and execute in registration order.
Resources are logical entities (e.g., "orders", "users") rather than topic names. This decouples the mutation signal from the specific topics that get updated.
Calling OnMutate on a closed broker is a no-op.
Example ¶
ExampleSSEBroker_OnMutate demonstrates the OnMutate/NotifyMutate pattern for decoupling resource mutations from topic updates.
package main
import (
"fmt"
"github.com/catgoose/tavern"
)
func main() {
broker := tavern.NewSSEBroker()
defer broker.Close()
ch, unsub := broker.Subscribe("order-updates")
defer unsub()
broker.OnMutate("orders", func(e tavern.MutationEvent) {
broker.Publish("order-updates", fmt.Sprintf("order %s changed", e.ID))
})
broker.NotifyMutate("orders", tavern.MutationEvent{ID: "42", Data: "shipped"})
msg := <-ch
fmt.Println(msg)
}
Output: order 42 changed
func (*SSEBroker) OnPublishDrop ¶ added in v0.4.51
OnPublishDrop registers a callback that fires each time a message is dropped during fan-out because a subscriber's channel buffer is full. The callback receives the topic name and the number of subscribers for whom delivery failed. Only one callback is supported; subsequent calls replace the previous one.
The callback runs synchronously in the publish goroutine — keep it fast.
func (*SSEBroker) OnReconnect ¶ added in v0.4.46
func (b *SSEBroker) OnReconnect(topic string, fn ReconnectCallback)
OnReconnect registers a callback that fires when a subscriber reconnects with a Last-Event-ID header for the given topic. Unlike SSEBroker.OnReplayGap, which only fires when the replay log cannot satisfy the request, OnReconnect fires on every reconnection. The callback runs in its own goroutine and does not block the subscription. Multiple callbacks per topic are allowed. Calling this on a closed broker is a no-op.
func (*SSEBroker) OnRenderError ¶ added in v0.4.42
func (b *SSEBroker) OnRenderError(fn func(*RenderError))
OnRenderError registers a callback that fires when a render function fails. This applies to lazy OOB renders and scheduled section renders. Only one callback is supported; subsequent calls replace the previous callback. The callback receives a *RenderError with structured information about the failure. The callback runs synchronously in the goroutine where the error occurred — avoid blocking operations.
func (*SSEBroker) OnReplayGap ¶ added in v0.4.39
func (b *SSEBroker) OnReplayGap(topic string, fn ReplayGapCallback)
OnReplayGap registers a callback that fires when a subscriber reconnects with a Last-Event-ID that is no longer present in the replay log for the given topic. The callback runs in its own goroutine and does not block the subscription. Multiple callbacks per topic are allowed and all will fire. Like SSEBroker.SetReplayGapPolicy, gap callbacks are only meaningful when the topic uses ID-backed replay (see SetReplayGapPolicy for details). Calling this on a closed broker is a no-op.
func (*SSEBroker) Publish ¶
Publish fans out msg to every subscriber of the given topic. It is non-blocking: if a subscriber's channel buffer is full, the message is silently dropped for that subscriber rather than blocking the publisher. Publishing to a topic with no subscribers is a no-op.
func (*SSEBroker) PublishBlocking ¶ added in v0.4.51
PublishBlocking behaves like SSEBroker.Publish but blocks up to timeout for each subscriber whose channel buffer is full. If any subscriber cannot accept the message within the timeout, ErrPublishTimeout is returned. A timeout of zero falls back to non-blocking behavior (equivalent to SSEBroker.Publish).
func (*SSEBroker) PublishBlockingTo ¶ added in v0.4.51
PublishBlockingTo behaves like SSEBroker.PublishTo but blocks up to timeout for each matching scoped subscriber whose channel buffer is full. Returns ErrPublishTimeout if any delivery times out.
func (*SSEBroker) PublishDebounced ¶ added in v0.4.24
PublishDebounced publishes msg to the topic after the given duration of quiet. If called again for the same topic before the duration elapses, the timer resets and only the latest message is published. This is useful for rapid state changes (typing indicators, slider drags) where intermediate states are noise. This method is safe for concurrent use.
Example ¶
ExampleSSEBroker_PublishDebounced demonstrates debounced publishing where only the last message in a rapid sequence is delivered after a quiet period.
package main
import (
"fmt"
"time"
"github.com/catgoose/tavern"
)
func main() {
broker := tavern.NewSSEBroker()
defer broker.Close()
ch, unsub := broker.Subscribe("search")
defer unsub()
// Simulate rapid typing — only the final value should be published.
broker.PublishDebounced("search", "h", 50*time.Millisecond)
broker.PublishDebounced("search", "he", 50*time.Millisecond)
broker.PublishDebounced("search", "hel", 50*time.Millisecond)
broker.PublishDebounced("search", "hello", 50*time.Millisecond)
msg := <-ch
fmt.Println(msg)
}
Output: hello
func (*SSEBroker) PublishDrops ¶ added in v0.3.0
PublishDrops returns the cumulative number of messages that were dropped because a subscriber's channel buffer was full.
func (*SSEBroker) PublishIfChanged ¶ added in v0.4.22
PublishIfChanged publishes msg to the given topic only when it differs from the last message published via PublishIfChanged for that topic. It returns true if the message was published (content changed) or false if it was skipped (identical to the previous message). Comparison is done via an FNV-64a hash of the message content.
The deduplication state is per-topic and independent of SSEBroker.Publish. Use SSEBroker.ClearDedup to reset the stored hash for a topic.
func (*SSEBroker) PublishIfChangedOOB ¶ added in v0.4.29
PublishIfChangedOOB renders the given fragments and publishes the result to the topic only if it differs from the last message published via SSEBroker.PublishIfChanged for that topic. Returns true if published (content changed), false if skipped (identical).
func (*SSEBroker) PublishIfChangedOOBTo ¶ added in v0.4.29
PublishIfChangedOOBTo renders the given fragments and publishes the result only to scoped subscribers of the topic whose scope matches, and only if the content differs from the last publish for that topic+scope. Returns true if published, false if skipped.
func (*SSEBroker) PublishIfChangedTo ¶ added in v0.4.29
PublishIfChangedTo publishes msg to scoped subscribers of the given topic only when it differs from the last message published via PublishIfChangedTo for that topic+scope combination. Returns true if published, false if skipped. Comparison is done via an FNV-64a hash of the message content.
func (*SSEBroker) PublishIfChangedWithTTL ¶ added in v0.4.44
func (b *SSEBroker) PublishIfChangedWithTTL(topic, msg string, ttl time.Duration, opts ...TTLOption) bool
PublishIfChangedWithTTL combines deduplication with TTL. The message is published only if it differs from the last PublishIfChanged value for the topic, and the replay cache entry expires after the given TTL. Returns true if published (content changed), false if skipped.
func (*SSEBroker) PublishLazyIfChangedOOB ¶ added in v0.4.33
PublishLazyIfChangedOOB calls renderFn only if the topic has subscribers, then publishes the rendered fragments only if the content differs from the last publish. Combines the subscriber guard with deduplication. Returns true if published (content changed), false if skipped (no subscribers, no fragments, or identical content).
func (*SSEBroker) PublishLazyIfChangedOOBTo ¶ added in v0.4.33
PublishLazyIfChangedOOBTo calls renderFn only if the topic has subscribers, then publishes the rendered fragments to scoped subscribers only if the content differs from the last publish for that topic+scope.
func (*SSEBroker) PublishLazyOOB ¶ added in v0.4.33
PublishLazyOOB calls renderFn only if the topic has subscribers, then publishes the rendered fragments. This avoids expensive rendering (DB queries, template execution) when nobody is listening. If renderFn returns no fragments, no message is published.
func (*SSEBroker) PublishLazyOOBTo ¶ added in v0.4.33
PublishLazyOOBTo calls renderFn only if the topic has subscribers, then publishes the rendered fragments to scoped subscribers matching the scope.
func (*SSEBroker) PublishOOB ¶ added in v0.3.2
PublishOOB renders the given fragments and publishes them as a single SSE event.
func (*SSEBroker) PublishOOBTo ¶ added in v0.4.0
PublishOOBTo renders the given fragments and publishes them only to scoped subscribers whose scope matches.
func (*SSEBroker) PublishOOBWithTTL ¶ added in v0.4.44
PublishOOBWithTTL renders the given fragments and publishes them with a TTL on the replay cache entry. See SSEBroker.PublishWithTTL for TTL semantics.
func (*SSEBroker) PublishThrottled ¶ added in v0.4.24
PublishThrottled publishes msg to the topic at most once per interval. The first call publishes immediately. Subsequent calls within the interval are held; when the interval elapses, the most recent held message is published (latest-wins). This guarantees bounded latency for the first message while rate-limiting subsequent ones. This method is safe for concurrent use.
Example ¶
ExampleSSEBroker_PublishThrottled demonstrates throttled publishing where the first message publishes immediately and subsequent messages within the interval are rate-limited.
package main
import (
"fmt"
"time"
"github.com/catgoose/tavern"
)
func main() {
broker := tavern.NewSSEBroker()
defer broker.Close()
ch, unsub := broker.Subscribe("slider")
defer unsub()
// First call publishes immediately.
broker.PublishThrottled("slider", "first", 100*time.Millisecond)
msg := <-ch
fmt.Println(msg)
}
Output: first
func (*SSEBroker) PublishTo ¶ added in v0.4.0
PublishTo fans out msg only to scoped subscribers of the given topic whose scope matches. It is non-blocking: if a subscriber's channel buffer is full, the message is silently dropped. Publishing to a topic or scope with no matching subscribers is a no-op.
func (*SSEBroker) PublishToWithTTL ¶ added in v0.4.44
func (b *SSEBroker) PublishToWithTTL(topic, scope, msg string, ttl time.Duration, opts ...TTLOption)
PublishToWithTTL publishes msg to scoped subscribers with a TTL on the replay cache entry. See SSEBroker.PublishWithTTL for TTL semantics.
func (*SSEBroker) PublishWithID ¶ added in v0.4.25
PublishWithID publishes msg to the topic with an associated event ID. The message is cached in the replay log for Last-Event-ID resumption. The replay log size is controlled by SetReplayPolicy (default 1).
Example ¶
ExampleSSEBroker_PublishWithID demonstrates publishing messages with event IDs for Last-Event-ID resumption. When a client reconnects, it can resume from where it left off using SubscribeFromID.
package main
import (
"fmt"
"strings"
"github.com/catgoose/tavern"
)
func main() {
broker := tavern.NewSSEBroker()
defer broker.Close()
broker.SetReplayPolicy("orders", 10)
ch, unsub := broker.Subscribe("orders")
defer unsub()
broker.PublishWithID("orders", "evt-1", `{"order": "A"}`)
// Live subscriber receives the message with the injected id: field.
msg := <-ch
fmt.Println(extractData(msg))
}
// extractData is a test helper that extracts the raw data from a message
// that may contain injected SSE id: fields.
func extractData(msg string) string {
var parts []string
for _, line := range strings.Split(msg, "\n") {
if !strings.HasPrefix(line, "id: ") {
parts = append(parts, line)
}
}
result := strings.Join(parts, "\n")
return strings.TrimSpace(result)
}
Output: {"order": "A"}
func (*SSEBroker) PublishWithReplay ¶ added in v0.4.19
PublishWithReplay behaves like SSEBroker.Publish but also caches msg so that future subscribers of the topic immediately receive it on connect. Only the most recent message per topic is retained. Use SSEBroker.ClearReplay to remove the cached message for a topic.
func (*SSEBroker) PublishWithTTL ¶ added in v0.4.44
PublishWithTTL behaves like SSEBroker.PublishWithReplay but marks the replay cache entry with a time-to-live. After the TTL expires, the entry is removed from the replay cache so new subscribers do not see stale messages. The message is delivered immediately to all current subscribers regardless of the TTL.
Example ¶
ExampleSSEBroker_PublishWithTTL demonstrates ephemeral messages that expire from the replay cache after a duration. Current subscribers receive the message immediately, but new subscribers who connect after the TTL elapses will not see it.
package main
import (
"fmt"
"time"
"github.com/catgoose/tavern"
)
func main() {
broker := tavern.NewSSEBroker(tavern.WithMessageTTLSweep(10 * time.Millisecond))
defer broker.Close()
broker.SetReplayPolicy("alerts", 10)
ch, unsub := broker.Subscribe("alerts")
defer unsub()
broker.PublishWithTTL("alerts", "temporary alert", 50*time.Millisecond)
msg := <-ch
fmt.Println(msg)
}
Output: temporary alert
func (*SSEBroker) RemoveTopic ¶ added in v0.4.46
RemoveTopic removes a topic from an existing subscriber identified by subscriberID. The subscriber stops receiving messages from the topic. Returns true if the topic was found and removed. Lifecycle hooks (OnLastUnsubscribe) fire if this was the last subscriber on the topic.
If sendControl is true, a control event with type "tavern-topics-changed" is sent on the subscriber's channel.
func (*SSEBroker) RemoveTopicForScope ¶ added in v0.4.46
RemoveTopicForScope removes a topic from all subscribers with the matching scope. Returns the number of subscribers that had the topic removed.
func (*SSEBroker) RunPublisher ¶ added in v0.4.18
func (b *SSEBroker) RunPublisher(ctx context.Context, fn PublisherFunc)
RunPublisher launches fn in a new goroutine with panic recovery. If fn panics, the panic is recovered and logged (when a logger is configured via WithLogger). The goroutine is tracked by the broker's internal wait group so that SSEBroker.Close blocks until all publishers have returned.
fn receives the provided context and should return when the context is cancelled. Callers typically pass a context derived from the application's shutdown signal.
func (*SSEBroker) SSEHandler ¶ added in v0.4.34
func (b *SSEBroker) SSEHandler(topic string, opts ...SSEHandlerOption) http.Handler
SSEHandler returns an http.Handler that streams SSE messages for the given topic. It handles Content-Type headers, Last-Event-ID resumption via SSEBroker.SubscribeFromID, and the streaming select loop.
The default writer calls fmt.Fprint followed by http.Flusher.Flush for each message. Override with WithSSEWriter for custom formatting.
// Standard library
mux.Handle("/sse/dashboard", broker.SSEHandler("dashboard"))
// Echo
e.GET("/sse/dashboard", echo.WrapHandler(broker.SSEHandler("dashboard")))
// Custom writer (e.g., htmx-go)
mux.Handle("/sse", broker.SSEHandler("events",
tavern.WithSSEWriter(func(w http.ResponseWriter, msg string) error {
return htmx.WriteSSE(w, msg)
}),
))
func (*SSEBroker) SetBundleOnReconnect ¶ added in v0.4.46
SetBundleOnReconnect configures whether replay messages should be bundled into a single SSE write when a subscriber reconnects with a Last-Event-ID for the given topic. Bundling reduces DOM swap churn on the client by delivering all missed messages as one write.
func (*SSEBroker) SetOrdered ¶ added in v0.4.50
SetOrdered marks or unmarks a topic as ordered. When a topic is ordered, concurrent publishes are serialized through a per-topic mutex so that all subscribers observe messages in the same order. Non-ordered topics (the default) have zero additional synchronization overhead. This method is safe for concurrent use.
Call with enabled=true before publishing to guarantee ordering. Call with enabled=false to remove the constraint. Toggling while publishes are in-flight is safe but ordering is only guaranteed while the topic is marked as ordered.
func (*SSEBroker) SetReplayGapPolicy ¶ added in v0.4.39
func (b *SSEBroker) SetReplayGapPolicy(topic string, strategy GapStrategy, snapshotFn func() string)
SetReplayGapPolicy configures the gap strategy and optional snapshot function for the given topic. When a subscriber reconnects with a Last-Event-ID that has rolled out of the replay log:
- GapSilent: no special action (default, backwards compatible).
- GapFallbackToSnapshot: call snapshotFn and deliver the result as the first message to the subscriber, preceded by a "event: tavern-replay-gap" control event.
The snapshotFn parameter is only used with GapFallbackToSnapshot and may be nil for other strategies.
Gap detection requires ID-backed replay state: the topic must receive messages via SSEBroker.PublishWithID (or variants like PublishWithTTL) so that a replay log with event IDs exists. Without ID-backed publishes, subscribers never receive event IDs and Last-Event-ID reconnection is not meaningful. Calling SetReplayGapPolicy on a topic that only uses plain SSEBroker.Publish has no effect at runtime.
If a *slog.Logger is configured via WithLogger, a warning is logged when this method is called for a topic that has no replay log entries and no external ReplayStore.
func (*SSEBroker) SetReplayPolicy ¶ added in v0.4.24
SetReplayPolicy sets how many recent messages to cache for replay on the given topic. New subscribers receive up to n cached messages in order before live messages. Use n=0 to disable replay for the topic.
Example ¶
ExampleSSEBroker_SetReplayPolicy demonstrates configuring replay so that new subscribers receive recently cached messages on connect.
package main
import (
"fmt"
"github.com/catgoose/tavern"
)
func main() {
broker := tavern.NewSSEBroker()
defer broker.Close()
broker.SetReplayPolicy("news", 3)
// Publish before any subscriber exists.
broker.PublishWithReplay("news", "headline-1")
broker.PublishWithReplay("news", "headline-2")
broker.PublishWithReplay("news", "headline-3")
// New subscriber receives the cached messages.
ch, unsub := broker.Subscribe("news")
defer unsub()
for i := 0; i < 3; i++ {
fmt.Println(<-ch)
}
}
Output: headline-1 headline-2 headline-3
func (*SSEBroker) SetRetry ¶ added in v0.4.34
SetRetry sends an SSE retry directive to all subscribers of the given topic, including both unscoped and scoped subscribers. The browser's EventSource stores this value and uses it for the next reconnect attempt. Call before SSEBroker.Close in a graceful shutdown sequence to prevent clients from thundering-herding against new pods.
func (*SSEBroker) SetRetryAll ¶ added in v0.4.34
SetRetryAll sends an SSE retry directive to all subscribers across all topics, including both unscoped and scoped subscribers. This is a convenience for graceful shutdown scenarios where every connected client should back off before reconnecting.
func (*SSEBroker) SetSimplifiedRenderer ¶ added in v0.4.46
SetSimplifiedRenderer registers a function that produces lightweight content for the given topic. When a subscriber is in the simplify tier, the renderer is applied to the message before delivery. If no renderer is registered, the original message is delivered with no transformation.
func (*SSEBroker) Stats ¶ added in v0.3.0
func (b *SSEBroker) Stats() BrokerStats
Stats returns a point-in-time BrokerStats snapshot. It is a convenience method that combines SSEBroker.TopicCounts, SSEBroker.SubscriberCount, and SSEBroker.PublishDrops into a single lock acquisition.
Example ¶
package main
import (
"fmt"
"github.com/catgoose/tavern"
)
// Topic name conventions for dashboard and real-time UI applications. Your
// application may use any string as a topic name; these are provided as
// examples of consistent naming patterns.
const (
TopicSystemStats = "system-stats"
TopicActivityFeed = "activity-feed"
)
func main() {
broker := tavern.NewSSEBroker()
defer broker.Close()
_, unsub1 := broker.Subscribe(TopicSystemStats)
defer unsub1()
_, unsub2 := broker.Subscribe(TopicActivityFeed)
defer unsub2()
stats := broker.Stats()
fmt.Printf("topics=%d subscribers=%d drops=%d\n", stats.Topics, stats.Subscribers, stats.PublishDrops)
}
Output: topics=2 subscribers=2 drops=0
func (*SSEBroker) Subscribe ¶
Subscribe registers a new subscriber for the given topic and returns a read-only channel that will receive published messages, along with an unsubscribe function. The caller must invoke the returned function when done to release resources and close the channel. Calling the unsubscribe function more than once is safe and has no effect after the first call.
The returned channel is buffered (default capacity 10, configurable via WithBufferSize). If the subscriber does not drain the channel fast enough, messages will be dropped by SSEBroker.Publish.
func (*SSEBroker) SubscribeFromID ¶ added in v0.4.25
func (b *SSEBroker) SubscribeFromID(topic, lastEventID string) (msgs <-chan string, unsubscribe func())
SubscribeFromID subscribes to a topic and replays all cached messages with IDs after lastEventID. If lastEventID is empty, all cached messages are replayed (same as Subscribe). If lastEventID is not found in the replay log, no replay occurs (gap too large) and only live messages are delivered.
This implements the server side of the SSE Last-Event-ID resumption protocol. The HTTP handler should read the Last-Event-ID header from the request and pass it here.
Example ¶
ExampleSSEBroker_SubscribeFromID demonstrates replaying cached messages for a new subscriber when no Last-Event-ID is provided. When lastEventID is empty, all cached replay messages are delivered.
package main
import (
"fmt"
"strings"
"github.com/catgoose/tavern"
)
func main() {
broker := tavern.NewSSEBroker()
defer broker.Close()
broker.SetReplayPolicy("chat", 5)
broker.PublishWithID("chat", "msg-1", "hello")
broker.PublishWithID("chat", "msg-2", "world")
// Empty lastEventID replays all cached messages.
ch, unsub := broker.SubscribeFromID("chat", "")
defer unsub()
m1 := <-ch
m2 := <-ch
fmt.Println(extractData(m1))
fmt.Println(extractData(m2))
}
// extractData is a test helper that extracts the raw data from a message
// that may contain injected SSE id: fields.
func extractData(msg string) string {
var parts []string
for _, line := range strings.Split(msg, "\n") {
if !strings.HasPrefix(line, "id: ") {
parts = append(parts, line)
}
}
result := strings.Join(parts, "\n")
return strings.TrimSpace(result)
}
Output: hello world
func (*SSEBroker) SubscribeFromIDWith ¶ added in v0.4.76
func (b *SSEBroker) SubscribeFromIDWith(topic, lastEventID string, opts ...SubscribeOption) (msgs <-chan string, unsubscribe func())
SubscribeFromIDWith creates a composable resume-aware subscription. When lastEventID is empty it behaves like SSEBroker.SubscribeWith with replay-cache replay. When lastEventID is non-empty it replays messages from the ID-backed replay log after that ID, emits a tavern-reconnected control event, and handles gap fallback consistently with SSEBroker.SubscribeFromID.
Option semantics:
- SubWithFilter: applies uniformly to replay AND live messages. Control events (tavern-reconnected, tavern-replay-gap, tavern-replay-truncated) always bypass the filter.
- SubWithMeta: applied the same as live subscriptions.
- SubWithSnapshot: applied ONLY on fresh subscribe (lastEventID is empty); never delivered on successful resume. Gap-fallback snapshot is a separate mechanism configured via SSEBroker.SetReplayGapPolicy.
- SubWithRate: applied to LIVE delivery only. Replay messages are delivered directly and are NOT rate-limited.
- SubWithScope: sets the subscriber scope for live scope filtering. The replay log is scope-less, so any replayed messages are delivered regardless of scope. Note that SSEBroker.PublishWithID publishes only to unscoped subscribers, so a scoped resume subscriber will only receive live messages via scope-aware publish paths such as SSEBroker.PublishTo.
func (*SSEBroker) SubscribeGlob ¶ added in v0.4.46
func (b *SSEBroker) SubscribeGlob(pattern string) (msgs <-chan TopicMessage, unsubscribe func())
SubscribeGlob registers a subscriber for all topics matching the given glob pattern and returns a channel that receives TopicMessage values tagged with the actual publish topic. The pattern uses "/" as the topic separator:
- "*" matches exactly one segment
- "**" matches zero or more segments (any depth)
The returned unsubscribe function removes the glob subscriber and closes the channel. It is safe to call more than once.
Example ¶
ExampleSSEBroker_SubscribeGlob demonstrates hierarchical topic patterns using glob subscriptions. The "*" wildcard matches a single segment and "**" matches zero or more segments.
package main
import (
"fmt"
"github.com/catgoose/tavern"
)
func main() {
broker := tavern.NewSSEBroker()
defer broker.Close()
// Subscribe to all topics under "sensors/" with one level of nesting.
ch, unsub := broker.SubscribeGlob("sensors/*")
defer unsub()
broker.Publish("sensors/temperature", `{"value": 22.5}`)
broker.Publish("sensors/humidity", `{"value": 60}`)
// This won't match because "sensors/floor/1" has two segments after "sensors".
broker.Publish("sensors/floor/1", `{"value": 3}`)
msg1 := <-ch
msg2 := <-ch
fmt.Printf("topic=%s data=%s\n", msg1.Topic, msg1.Data)
fmt.Printf("topic=%s data=%s\n", msg2.Topic, msg2.Data)
}
Output: topic=sensors/temperature data={"value": 22.5} topic=sensors/humidity data={"value": 60}
func (*SSEBroker) SubscribeGlobScoped ¶ added in v0.4.46
func (b *SSEBroker) SubscribeGlobScoped(pattern, scope string) (msgs <-chan TopicMessage, unsubscribe func())
SubscribeGlobScoped registers a scoped glob subscriber. Only messages published via SSEBroker.PublishTo with a matching scope will be delivered.
func (*SSEBroker) SubscribeGlobWith ¶ added in v0.4.52
func (b *SSEBroker) SubscribeGlobWith(pattern string, opts ...SubscribeOption) (msgs <-chan TopicMessage, unsubscribe func())
SubscribeGlobWith subscribes to a glob pattern with composable options.
func (*SSEBroker) SubscribeMulti ¶ added in v0.4.34
func (b *SSEBroker) SubscribeMulti(topics ...string) (msgs <-chan TopicMessage, unsubscribe func())
SubscribeMulti subscribes to multiple topics and returns a single channel that receives TopicMessage values tagged with their source topic. The returned unsubscribe function removes the subscriber from all topics at once. Each topic counts toward its own subscriber total (lifecycle hooks fire correctly).
This eliminates the need for reflect.Select when a single SSE connection serves multiple topics.
Example ¶
ExampleSSEBroker_SubscribeMulti demonstrates subscribing to multiple topics with a single channel. Each received message includes the source topic, eliminating the need for reflect.Select.
package main
import (
"fmt"
"sort"
"github.com/catgoose/tavern"
)
func main() {
broker := tavern.NewSSEBroker()
defer broker.Close()
ch, unsub := broker.SubscribeMulti("cpu", "memory", "disk")
defer unsub()
broker.Publish("memory", "85%")
broker.Publish("cpu", "42%")
broker.Publish("disk", "67%")
// Collect all three messages.
msgs := make([]string, 3)
for i := 0; i < 3; i++ {
m := <-ch
msgs[i] = fmt.Sprintf("%s=%s", m.Topic, m.Data)
}
sort.Strings(msgs)
for _, m := range msgs {
fmt.Println(m)
}
}
Output: cpu=42% disk=67% memory=85%
func (*SSEBroker) SubscribeMultiFromID ¶ added in v0.4.76
func (b *SSEBroker) SubscribeMultiFromID(topics []string, lastEventID string) (msgs <-chan TopicMessage, unsubscribe func())
SubscribeMultiFromID subscribes to multiple topics with shared Last-Event-ID replay/resume semantics and returns a single channel of TopicMessage values tagged with their source topic. Each topic independently replays from its ID-backed replay log after lastEventID (or from the current cache if lastEventID is empty), then continues with live messages. The returned unsubscribe function closes all inner subscriptions at once.
Semantics:
- The same lastEventID is applied uniformly to every topic. If a topic's replay log does not contain the ID, that topic's gap handling (reconnect callbacks, gap strategy) runs as configured via SetReplayGapPolicy.
- Ordering is not guaranteed across topics. Within a single topic, messages preserve their published order.
- Calling the returned unsubscribe closes all inner channels; the output channel is closed once all fan-in goroutines exit.
This is the multi-topic counterpart to SSEBroker.SubscribeFromID and mirrors SSEBroker.SubscribeMulti's fan-in structure.
func (*SSEBroker) SubscribeMultiWith ¶ added in v0.4.52
func (b *SSEBroker) SubscribeMultiWith(topics []string, opts ...SubscribeOption) (msgs <-chan TopicMessage, unsubscribe func())
SubscribeMultiWith subscribes to multiple topics with composable options. Options like filter and rate are applied uniformly to all topics.
func (*SSEBroker) SubscribeMultiWithMeta ¶ added in v0.4.46
func (b *SSEBroker) SubscribeMultiWithMeta(meta SubscribeMeta, topics ...string) (msgs <-chan TopicMessage, unsubscribe func())
SubscribeMultiWithMeta subscribes to multiple topics with metadata and returns a managed multi-subscription that supports dynamic topic changes via SSEBroker.AddTopic and SSEBroker.RemoveTopic.
func (*SSEBroker) SubscribeScoped ¶ added in v0.4.0
SubscribeScoped registers a subscriber with a scope key for the given topic. Only messages published via SSEBroker.PublishTo with a matching scope will be delivered. The returned unsubscribe function releases resources and closes the channel; it is safe to call more than once.
func (*SSEBroker) SubscribeScopedWithCoalescing ¶ added in v0.4.48
func (b *SSEBroker) SubscribeScopedWithCoalescing(topic, scope string) (msgs <-chan string, unsubscribe func())
SubscribeScopedWithCoalescing registers a scoped coalescing subscriber. Only messages published via SSEBroker.PublishTo with a matching scope are delivered, and rapid updates are coalesced to the latest value.
func (*SSEBroker) SubscribeScopedWithFilter ¶ added in v0.4.46
func (b *SSEBroker) SubscribeScopedWithFilter(topic, scope string, predicate FilterPredicate) (msgs <-chan string, unsubscribe func())
SubscribeScopedWithFilter registers a scoped subscriber with a predicate filter. Only messages published via SSEBroker.PublishTo with a matching scope AND passing the predicate are delivered. Non-matching messages are silently skipped.
func (*SSEBroker) SubscribeScopedWithRate ¶ added in v0.4.45
func (b *SSEBroker) SubscribeScopedWithRate(topic, scope string, rate Rate) (msgs <-chan string, unsubscribe func())
SubscribeScopedWithRate registers a scoped subscriber with per-subscriber rate limiting. See SSEBroker.SubscribeWithRate for rate-limiting semantics.
func (*SSEBroker) SubscribeWith ¶ added in v0.4.52
func (b *SSEBroker) SubscribeWith(topic string, opts ...SubscribeOption) (msgs <-chan string, unsubscribe func())
SubscribeWith creates a subscription using composable options. It combines the functionality of Subscribe, SubscribeScoped, SubscribeWithFilter, SubscribeWithRate, SubscribeWithMeta, and SubscribeWithSnapshot into a single call.
func (*SSEBroker) SubscribeWithCoalescing ¶ added in v0.4.48
SubscribeWithCoalescing registers a subscriber that automatically coalesces rapid updates. When multiple messages are published before the subscriber reads, only the latest value is delivered. This is ideal for high-frequency data like stock tickers or sensor readings where intermediate values are irrelevant.
Replaced (coalesced) messages do not count as drops in SSEBroker.PublishDrops. The coalescing channel uses an internal atomic pointer so updates are lock-free in the publish path.
The returned channel receives the latest message whenever a new value is available. Call the returned function to unsubscribe and close the channel. The unsubscribe function is safe to call more than once.
func (*SSEBroker) SubscribeWithFilter ¶ added in v0.4.46
func (b *SSEBroker) SubscribeWithFilter(topic string, predicate FilterPredicate) (msgs <-chan string, unsubscribe func())
SubscribeWithFilter registers a subscriber with a predicate filter for the given topic. Only messages for which the predicate returns true are delivered to the subscriber's channel. Non-matching messages are silently skipped and do not count toward drop counts or backpressure.
The predicate runs in the publish goroutine — keep it fast.
func (*SSEBroker) SubscribeWithMeta ¶ added in v0.4.34
func (b *SSEBroker) SubscribeWithMeta(topic string, meta SubscribeMeta) (msgs <-chan string, unsubscribe func())
SubscribeWithMeta registers a subscriber with optional metadata. Behaves like SSEBroker.Subscribe but attaches metadata queryable via SSEBroker.Subscribers.
func (*SSEBroker) SubscribeWithRate ¶ added in v0.4.45
func (b *SSEBroker) SubscribeWithRate(topic string, rate Rate) (msgs <-chan string, unsubscribe func())
SubscribeWithRate registers a subscriber with per-subscriber rate limiting. Messages published faster than the configured rate are held, and the most recent held message is delivered when the interval elapses (latest-wins). Rate limiting is per-subscriber and does not affect the publisher or other subscribers.
func (*SSEBroker) SubscribeWithSnapshot ¶ added in v0.4.34
func (b *SSEBroker) SubscribeWithSnapshot(topic string, snapshotFn func() string) (msgs <-chan string, unsubscribe func())
SubscribeWithSnapshot subscribes to a topic and immediately sends the result of snapshotFn as the first message before any live publishes. The snapshot function runs while the subscription is being registered, ensuring no messages are missed between the snapshot and live stream. If snapshotFn returns an empty string, no snapshot is sent.
This eliminates the dual-render pattern where page handlers and publishers independently render the same initial state.
func (*SSEBroker) SubscriberCount ¶ added in v0.3.0
SubscriberCount returns the total number of active subscribers across all topics, including both unscoped and scoped subscribers.
func (*SSEBroker) Subscribers ¶ added in v0.4.34
func (b *SSEBroker) Subscribers(topic string) []SubscriberInfo
Subscribers returns a snapshot of all active subscribers for the given topic. The returned slice is a copy and safe to read without synchronization.
func (*SSEBroker) TopicCounts ¶
TopicCounts returns a snapshot of the number of active subscribers per topic. The returned map is a copy and safe to read without synchronization. Counts include both unscoped and scoped subscribers.
func (*SSEBroker) UnsubscribeGlob ¶ added in v0.4.46
func (b *SSEBroker) UnsubscribeGlob(ch <-chan TopicMessage)
UnsubscribeGlob is a convenience alias: callers may pass the channel returned by [SubscribeGlob] but the idiomatic approach is to call the unsubscribe function returned alongside the channel. This method finds and removes the glob subscription associated with ch, closing the channel.
func (*SSEBroker) Use ¶ added in v0.4.41
func (b *SSEBroker) Use(mw Middleware)
Use registers global middleware that runs on every publish regardless of topic. Middleware executes in registration order (first registered = outermost wrapper). It must be called before any publishes; adding middleware while publishing is safe but the new middleware only takes effect on subsequent publishes.
Example ¶
ExampleSSEBroker_Use demonstrates global publish middleware. Middleware wraps every publish call and can transform messages, add logging, or swallow publishes entirely.
package main
import (
"fmt"
"strings"
"github.com/catgoose/tavern"
)
func main() {
broker := tavern.NewSSEBroker()
defer broker.Close()
// Add a middleware that uppercases all messages.
broker.Use(func(next tavern.PublishFunc) tavern.PublishFunc {
return func(topic, msg string) {
next(topic, strings.ToUpper(msg))
}
})
ch, unsub := broker.Subscribe("events")
defer unsub()
broker.Publish("events", "hello world")
msg := <-ch
fmt.Println(msg)
}
Output: HELLO WORLD
func (*SSEBroker) UseTopics ¶ added in v0.4.41
func (b *SSEBroker) UseTopics(pattern string, mw Middleware)
UseTopics registers middleware that runs only when the topic matches the given pattern. Pattern matching uses simple wildcard rules:
- An asterisk (*) matches any sequence of characters within a single segment (between colons).
- A pattern without wildcards must match the topic exactly.
Examples: "orders:*" matches "orders:list" and "orders:detail" but not "orders:detail:item".
Example ¶
ExampleSSEBroker_UseTopics demonstrates topic-scoped middleware that only runs when the publish topic matches a pattern.
package main
import (
"fmt"
"github.com/catgoose/tavern"
)
func main() {
broker := tavern.NewSSEBroker()
defer broker.Close()
// Add middleware only for topics matching "log:*".
broker.UseTopics("log:*", func(next tavern.PublishFunc) tavern.PublishFunc {
return func(topic, msg string) {
next(topic, "[LOG] "+msg)
}
})
logCh, unsub1 := broker.Subscribe("log:app")
defer unsub1()
dataCh, unsub2 := broker.Subscribe("data")
defer unsub2()
broker.Publish("log:app", "request handled")
broker.Publish("data", "raw value")
fmt.Println(<-logCh)
fmt.Println(<-dataCh)
}
Output: [LOG] request handled raw value
type SSEHandlerOption ¶ added in v0.4.34
type SSEHandlerOption func(*sseHandler)
SSEHandlerOption configures the SSE handler.
func WithMaxConnectionDuration ¶ added in v0.4.57
func WithMaxConnectionDuration(d time.Duration) SSEHandlerOption
WithMaxConnectionDuration sets a maximum lifetime for SSE connections. After the configured duration (plus 0-10% random jitter to prevent thundering herd), the handler sends a retry: directive and an SSE comment, then closes the connection. The browser's EventSource will automatically reconnect with Last-Event-ID, providing seamless resumption.
A zero or negative duration disables the limit.
func WithReconnectDelay ¶ added in v0.4.63
func WithReconnectDelay(delay time.Duration) SSEHandlerOption
WithReconnectDelay sets the SSE retry: value (in milliseconds) sent when a connection is closed due to WithMaxConnectionDuration. The browser's EventSource uses this value to determine how long to wait before reconnecting. A zero or negative value defaults to 1000ms.
func WithSSEWriter ¶ added in v0.4.34
func WithSSEWriter(fn SSEWriterFunc) SSEHandlerOption
WithSSEWriter overrides the default message writer. The provided function is called for each message and is responsible for writing to the response and flushing if needed. Use this to integrate with libraries like htmx-go or to add custom SSE formatting.
Example ¶
ExampleWithSSEWriter demonstrates a custom SSE writer that formats messages with a prefix before writing them to the HTTP response.
package main
import (
"fmt"
"net/http"
"github.com/catgoose/tavern"
)
func main() {
broker := tavern.NewSSEBroker()
defer broker.Close()
customWriter := tavern.WithSSEWriter(func(w http.ResponseWriter, msg string) error {
_, err := fmt.Fprintf(w, "data: %s\n\n", msg)
return err
})
handler := broker.SSEHandler("events", customWriter)
mux := http.NewServeMux()
mux.Handle("/sse", handler)
fmt.Println("custom writer handler registered")
}
Output: custom writer handler registered
type SSEMessage ¶
type SSEMessage struct {
// Event is the SSE event type (the "event:" field).
Event string
// Data is the event payload (the "data:" field).
Data string
// ID is the optional event identifier (the "id:" field). When set, the
// browser will send it back as Last-Event-ID on reconnection.
ID string
// Retry is the optional reconnection time in milliseconds (the "retry:"
// field). A zero value omits the field.
Retry int
}
SSEMessage represents a complete Server-Sent Event message conforming to the W3C SSE specification (https://html.spec.whatwg.org/multipage/server-sent-events.html). Use NewSSEMessage to create one with the required fields, then chain SSEMessage.WithID or SSEMessage.WithRetry for optional fields.
func NewSSEMessage ¶
func NewSSEMessage(event, data string) SSEMessage
NewSSEMessage creates an SSEMessage with the required event type and data payload. Use the builder methods SSEMessage.WithID and SSEMessage.WithRetry to set optional fields.
func (SSEMessage) String ¶
func (m SSEMessage) String() string
String formats the SSEMessage as wire-format SSE text, terminated by a double newline as required by the specification.
func (SSEMessage) WithID ¶
func (m SSEMessage) WithID(id string) SSEMessage
WithID returns a copy of the message with the given event ID set. The browser uses this ID for reconnection via the Last-Event-ID header.
func (SSEMessage) WithRetry ¶
func (m SSEMessage) WithRetry(ms int) SSEMessage
WithRetry returns a copy of the message with the reconnection time set to ms milliseconds. The browser will wait this long before attempting to reconnect after a connection loss.
type SSEWriterFunc ¶ added in v0.4.34
type SSEWriterFunc func(w http.ResponseWriter, msg string) error
SSEWriterFunc writes a message to the HTTP response. It is called for each message received from the subscriber channel. The default writer calls fmt.Fprint followed by Flush. Override with WithSSEWriter to use custom formatting (e.g., htmx-go).
type ScheduledPublisher ¶ added in v0.4.24
type ScheduledPublisher struct {
// contains filtered or unexported fields
}
ScheduledPublisher manages multiple named sections with independent update intervals. It ticks on a fast base interval (default 100ms), renders due sections into a shared buffer, and publishes one batched message per tick. It automatically skips rendering when no subscribers are connected to the topic. ScheduledPublisher is safe for concurrent use; sections can be registered while the publisher is running.
func (*ScheduledPublisher) Register ¶ added in v0.4.24
func (p *ScheduledPublisher) Register(name string, interval time.Duration, fn RenderFunc, opts ...SectionOptions)
Register adds a named section with an interval and render function. Sections are rendered in registration order.
func (*ScheduledPublisher) SetInterval ¶ added in v0.4.24
func (p *ScheduledPublisher) SetInterval(name string, interval time.Duration) bool
SetInterval changes the interval of a registered section at runtime. Returns false if the section name is not found.
func (*ScheduledPublisher) Start ¶ added in v0.4.24
func (p *ScheduledPublisher) Start(ctx context.Context)
Start begins the publish loop. It blocks until ctx is cancelled. Typically called via broker.RunPublisher(ctx, pub.Start) or go pub.Start(ctx).
type ScheduledPublisherOption ¶ added in v0.4.24
type ScheduledPublisherOption func(*ScheduledPublisher)
ScheduledPublisherOption configures the scheduled publisher.
func WithBaseTick ¶ added in v0.4.24
func WithBaseTick(d time.Duration) ScheduledPublisherOption
WithBaseTick sets the base tick interval. Default is 100ms.
type SectionOptions ¶ added in v0.4.42
type SectionOptions struct {
// CircuitBreaker enables circuit breaker protection for the section.
// When nil, the section renders normally without circuit breaker logic.
CircuitBreaker *CircuitBreakerConfig
}
SectionOptions configures optional behavior for a registered section. Pass as the last argument to ScheduledPublisher.Register.
type StreamSSEOption ¶ added in v0.4.74
type StreamSSEOption func(*streamSSEConfig)
StreamSSEOption configures StreamSSE behavior.
func WithStreamHeartbeat ¶ added in v0.4.74
func WithStreamHeartbeat(interval time.Duration) StreamSSEOption
WithStreamHeartbeat enables periodic SSE comment heartbeats while streaming. If interval is greater than zero, a ": keepalive\n\n" comment is written and flushed every interval to keep intermediaries (proxies, browsers) from closing idle connections. A zero or negative interval disables heartbeats.
Tavern's broker-level keepalive (set via WithKeepalive on the broker) emits comments to all subscribers; WithStreamHeartbeat is the per-connection equivalent for handlers built directly on StreamSSE.
func WithStreamSnapshot ¶ added in v0.4.74
func WithStreamSnapshot(fn func() string) StreamSSEOption
WithStreamSnapshot registers a snapshot function that is called once, after SSE headers are written and before any channel values are streamed. The returned string is written verbatim as the initial frame. Return an empty string to skip the snapshot without disabling streaming.
Use this to deliver server-rendered initial state to new subscribers.
func WithStreamWriter ¶ added in v0.4.74
func WithStreamWriter(fn SSEWriterFunc) StreamSSEOption
WithStreamWriter overrides the default SSE frame writer used by StreamSSE. The default writer calls fmt.Fprint followed by http.Flusher.Flush for each frame. Override to integrate with libraries like htmx-go or to add custom SSE formatting.
type SubscribeMeta ¶ added in v0.4.34
type SubscribeMeta struct {
// ID is an identifier for this subscriber (e.g., session ID, user ID).
ID string
// Meta is arbitrary key-value metadata.
Meta map[string]string
}
SubscribeMeta holds optional metadata for SSEBroker.SubscribeWithMeta. The ID is used for targeted operations like SSEBroker.Disconnect and SSEBroker.AddTopic.
type SubscribeOption ¶ added in v0.4.52
type SubscribeOption func(*subscribeConfig)
SubscribeOption configures a composable subscription created via SSEBroker.SubscribeWith, SSEBroker.SubscribeMultiWith, or SSEBroker.SubscribeGlobWith.
func SubWithFilter ¶ added in v0.4.52
func SubWithFilter(fn FilterPredicate) SubscribeOption
SubWithFilter attaches a filter predicate. Only messages for which the predicate returns true are delivered.
func SubWithMeta ¶ added in v0.4.52
func SubWithMeta(m SubscribeMeta) SubscribeOption
SubWithMeta attaches subscriber metadata queryable via SSEBroker.Subscribers.
func SubWithRate ¶ added in v0.4.52
func SubWithRate(r Rate) SubscribeOption
SubWithRate enables per-subscriber rate limiting.
func SubWithScope ¶ added in v0.4.52
func SubWithScope(scope string) SubscribeOption
SubWithScope sets the subscription scope. Only messages published via SSEBroker.PublishTo with a matching scope will be delivered.
func SubWithSnapshot ¶ added in v0.4.52
func SubWithSnapshot(fn func() string) SubscribeOption
SubWithSnapshot provides a snapshot function whose result is delivered as the first message before any live publishes.
type SubscriberInfo ¶ added in v0.4.34
type SubscriberInfo struct {
// ID is the caller-provided identifier (empty if not set).
ID string
// Topic is the topic this subscriber is on.
Topic string
// Scope is the scope key (empty for unscoped subscribers).
Scope string
// ConnectedAt is when the subscription was created.
ConnectedAt time.Time
// Meta is caller-provided key-value metadata.
Meta map[string]string
}
SubscriberInfo describes an active subscriber. Retrieve a snapshot of all subscribers for a topic via SSEBroker.Subscribers.
type TTLOption ¶ added in v0.4.44
type TTLOption func(*ttlConfig)
TTLOption configures optional behavior for TTL-based publish methods such as SSEBroker.PublishWithTTL and SSEBroker.PublishToWithTTL.
func WithAutoRemove ¶ added in v0.4.44
WithAutoRemove configures a TTL publish to automatically send an OOB delete fragment for the given element ID when the message expires from the replay cache. This removes the element from currently-connected clients' DOM.
Example ¶
ExampleWithAutoRemove demonstrates using WithAutoRemove to automatically send an OOB delete fragment when a TTL message expires.
package main
import (
"fmt"
"time"
"github.com/catgoose/tavern"
)
func main() {
broker := tavern.NewSSEBroker(tavern.WithMessageTTLSweep(10 * time.Millisecond))
defer broker.Close()
broker.SetReplayPolicy("toasts", 10)
ch, unsub := broker.Subscribe("toasts")
defer unsub()
broker.PublishWithTTL("toasts", "<div id=\"toast-1\">Notice</div>", 30*time.Millisecond,
tavern.WithAutoRemove("toast-1"),
)
msg := <-ch
fmt.Println(msg)
}
Output: <div id="toast-1">Notice</div>
type TopicMessage ¶ added in v0.4.34
type TopicMessage struct {
// Topic is the name of the topic the message was published to.
Topic string
// Data is the published message payload.
Data string
}
TopicMessage pairs a message with the topic it was published on. It is returned by multiplexed subscription methods such as SSEBroker.SubscribeMulti and SSEBroker.SubscribeGlob.
type TopicMetrics ¶ added in v0.4.31
type TopicMetrics struct {
// Published is the total number of messages successfully delivered to at
// least one subscriber on this topic.
Published int64
// Dropped is the total number of delivery failures (subscriber buffer full)
// on this topic.
Dropped int64
// PeakSubscribers is the highest number of concurrent subscribers observed
// on this topic since the broker was created.
PeakSubscribers int
}
TopicMetrics holds per-topic counters. All fields are cumulative since the broker was created (except PeakSubscribers which is a high-water mark).
type TopicObservability ¶ added in v0.4.46
type TopicObservability struct {
PublishLatency LatencyHistogram
SubscriberLag map[string]int // subscriberID -> buffer depth
ConnectionDurations []time.Duration
Throughput float64 // msgs/sec over last window
EvictionCount int64
}
TopicObservability holds observability data for a single topic. All fields are populated based on the features enabled in ObservabilityConfig; disabled features produce zero values.
Source Files
¶
- admission.go
- backend_integration.go
- backpressure.go
- batch.go
- broker.go
- circuit.go
- coalesce.go
- debounce.go
- filter.go
- gap.go
- glob.go
- group.go
- handler.go
- hooks.go
- message.go
- metrics.go
- middleware.go
- multi.go
- observability.go
- oob.go
- ordering.go
- publish_signal.go
- rate.go
- reconnect.go
- replay_store.go
- resume.go
- scheduled.go
- snapshot.go
- stream.go
- subscribe_options.go
- subscriber.go
- subscription_changes.go
- throttle.go
- ttl.go
Directories
¶
| Path | Synopsis |
|---|---|
|
Package backend defines the pluggable interface for cross-process fan-out in tavern.
|
Package backend defines the pluggable interface for cross-process fan-out in tavern. |
|
memory
Package memory provides an in-process Backend implementation for testing and single-instance deployments.
|
Package memory provides an in-process Backend implementation for testing and single-instance deployments. |
|
Package presence provides structured presence tracking built on top of a tavern SSE broker.
|
Package presence provides structured presence tracking built on top of a tavern SSE broker. |
|
Package taverntest provides test helpers for tavern-based applications.
|
Package taverntest provides test helpers for tavern-based applications. |
