Documentation
¶
Overview ¶
Package sse provides Server-Sent Events (SSE) support for real-time server-to-client streaming using Go's standard library.
Overview ¶
Server-Sent Events enable servers to push real-time updates to clients over a single HTTP connection. Unlike WebSockets, SSE is unidirectional (server to client) and works over standard HTTP.
This package provides:
- Connection management with automatic cleanup
- Broadcast hubs for multi-client scenarios
- Event replay for missed message recovery
- Spec-compliant event formatting
Basic Usage ¶
Create an SSE endpoint using the server helper:
app := zh.New(zh.Config{
Extensions: zh.ExtensionsConfig{
SSEProvider: sse.NewDefaultProvider(),
},
})
app.GET("/events", zh.HandlerFunc(func(w http.ResponseWriter, r *http.Request) error {
stream, err := sse.New(w, r)
if err != nil {
return err
}
defer stream.Close()
for {
select {
case <-r.Context().Done():
return nil
case msg := <-messages:
stream.Send(sse.Event{Name: "message", Data: []byte(msg)})
}
}
})
Broadcasting with Hubs ¶
Use a Hub to manage multiple connections and broadcast to all clients:
hub := sse.NewHub()
app.GET("/events", zh.HandlerFunc(func(w http.ResponseWriter, r *http.Request) error {
stream, err := sse.New(w, r)
if err != nil {
return err
}
hub.Register(r.Context(), stream)
defer hub.Unregister(r.Context(), stream)
<-r.Context().Done()
return nil
})
// Broadcast to all connected clients
hub.Broadcast(r.Context(), sse.Event{Name: "update", Data: []byte("hello")})
The Broadcaster interface allows custom implementations (e.g. Redis-backed) for cross-instance broadcasting. The built-in Hub satisfies this interface.
Hub also provides OnBroadcast and OnBroadcastTo hooks for observability or intercepting events before delivery.
Event Replay ¶
Enable clients to recover missed events using the Last-Event-ID header:
replayer := sse.NewMemoryReplayer(1000, time.Hour) // Keep 1000 events for 1 hour
app.GET("/events", zh.HandlerFunc(func(w http.ResponseWriter, r *http.Request) error {
stream, err := sse.New(w, r)
if err != nil {
return err
}
defer stream.Close()
// Replay missed events
lastID := r.Header.Get("Last-Event-ID")
if lastID != "" {
replayer.Replay(lastID, stream.Send)
}
// Start listening for new events
for msg := range messages {
event := replayer.Store(sse.Event{Name: "message", Data: []byte(msg)})
stream.Send(event)
}
return nil
})
Custom Provider ¶
Implement the Provider interface to use a custom SSE implementation:
type MyProvider struct{}
func (p *MyProvider) New(w http.ResponseWriter, r *http.Request) (sse.Connection, error) {
return myCustomSSE(w, r)
}
app := zh.New(zh.Config{
Extensions: zh.ExtensionsConfig{
SSEProvider: &MyProvider{},
},
})
Event Format ¶
Events follow the SSE specification:
id: 123 event: message retry: 5000 data: Hello, World!
Use the Event struct to create events:
event := sse.Event{
ID: "123",
Name: "message",
Data: []byte("Hello, World!"),
Retry: 5 * time.Second,
}
Low-Level Writer ¶
For direct control, use Writer instead of the full Connection interface:
writer, err := sse.NewWriter(w, r)
if err != nil {
return err
}
writer.WriteEvent(sse.Event{Data: []byte("hello")})
Index ¶
- func IsClientDisconnected(r *http.Request) bool
- type Broadcaster
- type Connection
- type DefaultProvider
- type Event
- type Hub
- func (h *Hub) Broadcast(ctx context.Context, event Event)
- func (h *Hub) BroadcastTo(ctx context.Context, topic string, event Event)
- func (h *Hub) ConnectionCount() int
- func (h *Hub) Register(_ context.Context, s *SSE)
- func (h *Hub) Subscribe(_ context.Context, s *SSE, topic string)
- func (h *Hub) TopicCount(topic string) int
- func (h *Hub) Unregister(_ context.Context, s *SSE)
- func (h *Hub) Unsubscribe(_ context.Context, s *SSE, topic string)
- type MemoryReplayer
- type Provider
- type Replayer
- type SSE
- type Writer
Constants ¶
This section is empty.
Variables ¶
This section is empty.
Functions ¶
func IsClientDisconnected ¶
IsClientDisconnected checks if the client has disconnected. This checks if the request context is done.
Types ¶
type Broadcaster ¶ added in v0.87.0
type Broadcaster interface {
Register(ctx context.Context, s *SSE)
Unregister(ctx context.Context, s *SSE)
Subscribe(ctx context.Context, s *SSE, topic string)
Unsubscribe(ctx context.Context, s *SSE, topic string)
Broadcast(ctx context.Context, event Event)
BroadcastTo(ctx context.Context, topic string, event Event)
ConnectionCount() int
TopicCount(topic string) int
}
Broadcaster is the interface for SSE broadcast hubs. The built-in Hub satisfies this interface, and users can provide their own implementations (e.g. backed by Redis) for cross-instance broadcast.
type Connection ¶
type Connection interface {
// Send writes an event to the client.
// Returns error if the connection is closed or write fails.
Send(event Event) error
// SendComment sends a comment (heartbeat/keepalive).
// Comments are ignored by the client but keep connections alive through proxies.
SendComment(comment string) error
// Close signals the SSE connection is done.
// No further events should be sent after Close.
Close() error
// SetRetry sets the default reconnection time for this connection.
// Affects subsequent events without explicit Retry value.
SetRetry(d time.Duration) error
}
Connection represents an active Server-Sent Events connection. Users can implement this interface with their own SSE library, or use the built-in EventStream implementation.
type DefaultProvider ¶
type DefaultProvider struct{}
DefaultProvider implements Provider using the stdlib
func NewDefaultProvider ¶
func NewDefaultProvider() *DefaultProvider
NewDefaultProvider creates a new stdlib-based SSE provider.
func (*DefaultProvider) New ¶
func (p *DefaultProvider) New(w http.ResponseWriter, r *http.Request) (Connection, error)
New creates a new SSE connection using the stdlib implementation.
type Hub ¶
type Hub struct {
OnBroadcast func(event Event)
OnBroadcastTo func(topic string, event Event)
// contains filtered or unexported fields
}
Hub manages multiple SSE connections for broadcasting.
func (*Hub) Broadcast ¶
Broadcast sends an event to all registered connections. Connections that fail to receive the event are automatically unregistered.
func (*Hub) BroadcastTo ¶
BroadcastTo sends an event to all connections subscribed to a topic. Connections that fail to receive the event are automatically unregistered.
func (*Hub) ConnectionCount ¶
ConnectionCount returns the number of registered connections.
func (*Hub) TopicCount ¶
TopicCount returns the number of connections subscribed to a topic.
func (*Hub) Unregister ¶
Unregister removes an SSE connection from the hub.
type MemoryReplayer ¶
type MemoryReplayer struct {
// contains filtered or unexported fields
}
MemoryReplayer stores events in memory with a circular buffer. Events can be limited by max count and/or TTL.
func NewMemoryReplayer ¶
func NewMemoryReplayer(maxEvents int, ttl time.Duration) *MemoryReplayer
NewMemoryReplayer creates a new in-memory event replayer. maxEvents is the maximum number of events to keep (0 = unlimited). ttl is how long to keep events (0 = no expiration).
func (*MemoryReplayer) Replay ¶
Replay sends all events after the given ID to the provided send function.
func (*MemoryReplayer) Store ¶
func (r *MemoryReplayer) Store(event Event) Event
Store saves an event to the replay buffer with an auto-generated ID. Returns the event with the assigned ID so it can be used for broadcasting.
type Provider ¶
type Provider interface {
// New creates a new SSE connection from the request/response.
// Returns error if headers were already sent or SSE is not supported.
New(w http.ResponseWriter, r *http.Request) (Connection, error)
}
Provider creates SSE connections from HTTP requests. Implement this to provide custom SSE implementations.
type Replayer ¶
type Replayer interface {
// Store saves an event to the replay buffer and returns the event with assigned ID.
Store(event Event) Event
// Replay sends all events after the given ID to the provided send function.
// Returns the number of events replayed and any error.
Replay(afterID string, send func(Event) error) (int, error)
}
Replayer defines the interface for event replay storage. Implementations can use in-memory storage, Redis, databases, etc.
type SSE ¶
type SSE struct {
// contains filtered or unexported fields
}
SSE is the built-in SSE implementation using Go's standard library.
func New ¶
New creates a new SSE connection using stdlib. This sets the appropriate headers and prepares the connection for streaming.
func WithReplay ¶
WithReplay creates a new SSE connection and replays missed events if Last-Event-ID header is present. After replay completes, the connection is ready for new events.
func (*SSE) SendComment ¶
SendComment sends a comment (heartbeat/keepalive).
type Writer ¶
type Writer struct {
// contains filtered or unexported fields
}
Writer wraps an http.ResponseWriter to provide SSE capabilities. This is a lower-level helper for users who want to write SSE directly.
func NewWriter ¶
NewWriter creates a new Writer from an http.ResponseWriter. This sets SSE headers and prepares the connection.
func (*Writer) WriteComment ¶
WriteComment writes an SSE comment.
func (*Writer) WriteEvent ¶
WriteEvent writes an SSE event.