sse

package
v0.91.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: May 8, 2026 License: MIT Imports: 9 Imported by: 0

Documentation

Overview

Package sse provides Server-Sent Events (SSE) support for real-time server-to-client streaming using Go's standard library.

Overview

Server-Sent Events enable servers to push real-time updates to clients over a single HTTP connection. Unlike WebSockets, SSE is unidirectional (server to client) and works over standard HTTP.

This package provides:

  • Connection management with automatic cleanup
  • Broadcast hubs for multi-client scenarios
  • Event replay for missed message recovery
  • Spec-compliant event formatting

Basic Usage

Create an SSE endpoint using the server helper:

app := zh.New(zh.Config{
    Extensions: zh.ExtensionsConfig{
        SSEProvider: sse.NewDefaultProvider(),
    },
})

app.GET("/events", zh.HandlerFunc(func(w http.ResponseWriter, r *http.Request) error {
    stream, err := sse.New(w, r)
    if err != nil {
        return err
    }
    defer stream.Close()

    for {
        select {
        case <-r.Context().Done():
            return nil
        case msg := <-messages:
            stream.Send(sse.Event{Name: "message", Data: []byte(msg)})
        }
    }
})

Broadcasting with Hubs

Use a Hub to manage multiple connections and broadcast to all clients:

hub := sse.NewHub()

app.GET("/events", zh.HandlerFunc(func(w http.ResponseWriter, r *http.Request) error {
    stream, err := sse.New(w, r)
    if err != nil {
        return err
    }

    hub.Register(r.Context(), stream)
    defer hub.Unregister(r.Context(), stream)

    <-r.Context().Done()
    return nil
})

// Broadcast to all connected clients
hub.Broadcast(r.Context(), sse.Event{Name: "update", Data: []byte("hello")})

The Broadcaster interface allows custom implementations (e.g. Redis-backed) for cross-instance broadcasting. The built-in Hub satisfies this interface.

Hub also provides OnBroadcast and OnBroadcastTo hooks for observability or intercepting events before delivery.

Event Replay

Enable clients to recover missed events using the Last-Event-ID header:

replayer := sse.NewMemoryReplayer(1000, time.Hour) // Keep 1000 events for 1 hour

app.GET("/events", zh.HandlerFunc(func(w http.ResponseWriter, r *http.Request) error {
    stream, err := sse.New(w, r)
    if err != nil {
        return err
    }
    defer stream.Close()

    // Replay missed events
    lastID := r.Header.Get("Last-Event-ID")
    if lastID != "" {
        replayer.Replay(lastID, stream.Send)
    }

    // Start listening for new events
    for msg := range messages {
        event := replayer.Store(sse.Event{Name: "message", Data: []byte(msg)})
        stream.Send(event)
    }
    return nil
})

Custom Provider

Implement the Provider interface to use a custom SSE implementation:

type MyProvider struct{}

func (p *MyProvider) New(w http.ResponseWriter, r *http.Request) (sse.Connection, error) {
    return myCustomSSE(w, r)
}

app := zh.New(zh.Config{
    Extensions: zh.ExtensionsConfig{
        SSEProvider: &MyProvider{},
    },
})

Event Format

Events follow the SSE specification:

id: 123
event: message
retry: 5000
data: Hello, World!

Use the Event struct to create events:

event := sse.Event{
    ID:    "123",
    Name:  "message",
    Data:  []byte("Hello, World!"),
    Retry: 5 * time.Second,
}

Low-Level Writer

For direct control, use Writer instead of the full Connection interface:

writer, err := sse.NewWriter(w, r)
if err != nil {
    return err
}
writer.WriteEvent(sse.Event{Data: []byte("hello")})

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

func IsClientDisconnected

func IsClientDisconnected(r *http.Request) bool

IsClientDisconnected checks if the client has disconnected. This checks if the request context is done.

Types

type Broadcaster added in v0.87.0

type Broadcaster interface {
	Register(ctx context.Context, s *SSE)
	Unregister(ctx context.Context, s *SSE)
	Subscribe(ctx context.Context, s *SSE, topic string)
	Unsubscribe(ctx context.Context, s *SSE, topic string)
	Broadcast(ctx context.Context, event Event)
	BroadcastTo(ctx context.Context, topic string, event Event)
	ConnectionCount() int
	TopicCount(topic string) int
}

Broadcaster is the interface for SSE broadcast hubs. The built-in Hub satisfies this interface, and users can provide their own implementations (e.g. backed by Redis) for cross-instance broadcast.

type Connection

type Connection interface {
	// Send writes an event to the client.
	// Returns error if the connection is closed or write fails.
	Send(event Event) error

	// SendComment sends a comment (heartbeat/keepalive).
	// Comments are ignored by the client but keep connections alive through proxies.
	SendComment(comment string) error

	// Close signals the SSE connection is done.
	// No further events should be sent after Close.
	Close() error

	// SetRetry sets the default reconnection time for this connection.
	// Affects subsequent events without explicit Retry value.
	SetRetry(d time.Duration) error
}

Connection represents an active Server-Sent Events connection. Users can implement this interface with their own SSE library, or use the built-in EventStream implementation.

type DefaultProvider

type DefaultProvider struct{}

DefaultProvider implements Provider using the stdlib

func NewDefaultProvider

func NewDefaultProvider() *DefaultProvider

NewDefaultProvider creates a new stdlib-based SSE provider.

func (*DefaultProvider) New

New creates a new SSE connection using the stdlib implementation.

type Event

type Event struct {
	ID    string
	Name  string
	Data  []byte
	Retry time.Duration
}

Event represents a single SSE event

type Hub

type Hub struct {
	OnBroadcast   func(event Event)
	OnBroadcastTo func(topic string, event Event)
	// contains filtered or unexported fields
}

Hub manages multiple SSE connections for broadcasting.

func NewHub

func NewHub() *Hub

NewHub creates a new SSE broadcast hub.

func (*Hub) Broadcast

func (h *Hub) Broadcast(ctx context.Context, event Event)

Broadcast sends an event to all registered connections. Connections that fail to receive the event are automatically unregistered.

func (*Hub) BroadcastTo

func (h *Hub) BroadcastTo(ctx context.Context, topic string, event Event)

BroadcastTo sends an event to all connections subscribed to a topic. Connections that fail to receive the event are automatically unregistered.

func (*Hub) ConnectionCount

func (h *Hub) ConnectionCount() int

ConnectionCount returns the number of registered connections.

func (*Hub) Register

func (h *Hub) Register(_ context.Context, s *SSE)

Register adds an SSE connection to the hub.

func (*Hub) Subscribe

func (h *Hub) Subscribe(_ context.Context, s *SSE, topic string)

Subscribe adds an SSE connection to a topic.

func (*Hub) TopicCount

func (h *Hub) TopicCount(topic string) int

TopicCount returns the number of connections subscribed to a topic.

func (*Hub) Unregister

func (h *Hub) Unregister(_ context.Context, s *SSE)

Unregister removes an SSE connection from the hub.

func (*Hub) Unsubscribe

func (h *Hub) Unsubscribe(_ context.Context, s *SSE, topic string)

Unsubscribe removes an SSE connection from a topic.

type MemoryReplayer

type MemoryReplayer struct {
	// contains filtered or unexported fields
}

MemoryReplayer stores events in memory with a circular buffer. Events can be limited by max count and/or TTL.

func NewMemoryReplayer

func NewMemoryReplayer(maxEvents int, ttl time.Duration) *MemoryReplayer

NewMemoryReplayer creates a new in-memory event replayer. maxEvents is the maximum number of events to keep (0 = unlimited). ttl is how long to keep events (0 = no expiration).

func (*MemoryReplayer) Replay

func (r *MemoryReplayer) Replay(afterID string, send func(Event) error) (int, error)

Replay sends all events after the given ID to the provided send function.

func (*MemoryReplayer) Store

func (r *MemoryReplayer) Store(event Event) Event

Store saves an event to the replay buffer with an auto-generated ID. Returns the event with the assigned ID so it can be used for broadcasting.

type Provider

type Provider interface {
	// New creates a new SSE connection from the request/response.
	// Returns error if headers were already sent or SSE is not supported.
	New(w http.ResponseWriter, r *http.Request) (Connection, error)
}

Provider creates SSE connections from HTTP requests. Implement this to provide custom SSE implementations.

type Replayer

type Replayer interface {
	// Store saves an event to the replay buffer and returns the event with assigned ID.
	Store(event Event) Event
	// Replay sends all events after the given ID to the provided send function.
	// Returns the number of events replayed and any error.
	Replay(afterID string, send func(Event) error) (int, error)
}

Replayer defines the interface for event replay storage. Implementations can use in-memory storage, Redis, databases, etc.

type SSE

type SSE struct {
	// contains filtered or unexported fields
}

SSE is the built-in SSE implementation using Go's standard library.

func New

func New(w http.ResponseWriter, r *http.Request) (*SSE, error)

New creates a new SSE connection using stdlib. This sets the appropriate headers and prepares the connection for streaming.

func WithReplay

func WithReplay(w http.ResponseWriter, r *http.Request, replayer Replayer) (*SSE, error)

WithReplay creates a new SSE connection and replays missed events if Last-Event-ID header is present. After replay completes, the connection is ready for new events.

func (*SSE) Close

func (s *SSE) Close() error

Close signals the SSE connection is done.

func (*SSE) Send

func (s *SSE) Send(event Event) error

Send writes an event to the client.

func (*SSE) SendComment

func (s *SSE) SendComment(comment string) error

SendComment sends a comment (heartbeat/keepalive).

func (*SSE) SetRetry

func (s *SSE) SetRetry(d time.Duration) error

SetRetry sets the default reconnection time for this connection.

func (*SSE) WaitDone

func (s *SSE) WaitDone()

WaitDone blocks until the monitor goroutine exits. This is primarily used for testing to verify goroutine cleanup.

type Writer

type Writer struct {
	// contains filtered or unexported fields
}

Writer wraps an http.ResponseWriter to provide SSE capabilities. This is a lower-level helper for users who want to write SSE directly.

func NewWriter

func NewWriter(w http.ResponseWriter, r *http.Request) (*Writer, error)

NewWriter creates a new Writer from an http.ResponseWriter. This sets SSE headers and prepares the connection.

func (*Writer) Flush

func (s *Writer) Flush()

Flush flushes the underlying writer.

func (*Writer) WriteComment

func (s *Writer) WriteComment(comment string) error

WriteComment writes an SSE comment.

func (*Writer) WriteEvent

func (s *Writer) WriteEvent(event Event) error

WriteEvent writes an SSE event.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL