sse

package
v0.1.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Nov 19, 2025 License: MIT Imports: 8 Imported by: 0

Documentation

Overview

Package sse implements Server-Sent Events (SSE) according to the text/event-stream specification.

SSE provides a simple, unidirectional protocol for sending real-time updates from server to client over HTTP. It's ideal for scenarios like live notifications, dashboards, and activity streams.

Index

Constants

This section is empty.

Variables

View Source
var (
	// ErrConnectionClosed is returned when attempting to send on a closed connection.
	ErrConnectionClosed = errors.New("sse: connection closed")

	// ErrNoFlusher is returned when http.ResponseWriter doesn't support flushing.
	// This usually indicates an incompatible HTTP server or proxy.
	ErrNoFlusher = errors.New("sse: ResponseWriter does not support flushing")
)

Common errors returned by Conn.

View Source
var (
	// ErrHubClosed is returned when attempting to use a closed hub.
	ErrHubClosed = errors.New("sse: hub closed")
)

Common errors returned by Hub.

Functions

func Comment

func Comment(text string) string

Comment creates an SSE comment for keep-alive or debugging.

Comments start with colon (:) and are ignored by clients. They're commonly used to keep the connection alive and prevent timeouts.

Example:

keepAlive := sse.Comment("keep-alive")
fmt.Println(keepAlive)
// Output:
// : keep-alive
//

Types

type Conn

type Conn struct {
	// contains filtered or unexported fields
}

Conn represents an active SSE connection to a client.

Conn manages the lifecycle of a Server-Sent Events connection, handling event transmission, context cancellation, and graceful shutdown.

Example:

func handler(w http.ResponseWriter, r *http.Request) {
    conn, err := sse.Upgrade(w, r)
    if err != nil {
        http.Error(w, err.Error(), http.StatusInternalServerError)
        return
    }
    defer conn.Close()

    // Send events
    conn.SendData("Hello, SSE!")
    conn.SendJSON(map[string]string{"status": "connected"})
}

func Upgrade

func Upgrade(w http.ResponseWriter, r *http.Request) (*Conn, error)

Upgrade upgrades an HTTP connection to SSE with the request's context.

It sets the necessary SSE headers, validates that the ResponseWriter supports flushing, and sends an initial connection comment.

The connection uses r.Context() for cancellation tracking.

Returns ErrNoFlusher if the ResponseWriter doesn't implement http.Flusher.

Example:

conn, err := sse.Upgrade(w, r)
if err != nil {
    http.Error(w, err.Error(), http.StatusInternalServerError)
    return
}
defer conn.Close()

func UpgradeWithContext

func UpgradeWithContext(ctx context.Context, w http.ResponseWriter, _ *http.Request) (*Conn, error)

UpgradeWithContext upgrades an HTTP connection to SSE with a custom context.

This is useful when you need finer control over connection lifecycle independent of the HTTP request context.

Returns ErrNoFlusher if the ResponseWriter doesn't implement http.Flusher.

Example:

ctx, cancel := context.WithTimeout(context.Background(), 5*time.Minute)
defer cancel()
conn, err := sse.UpgradeWithContext(ctx, w, r)

func (*Conn) Close

func (c *Conn) Close() error

Close closes the SSE connection.

It's safe to call Close multiple times. Subsequent calls are no-ops.

After Close, all Send operations will return ErrConnectionClosed.

Example:

defer conn.Close()

func (*Conn) Done

func (c *Conn) Done() <-chan struct{}

Done returns a channel that's closed when the connection is closed.

This is useful for coordinating shutdown with goroutines sending events.

Example:

go func() {
    ticker := time.NewTicker(time.Second)
    defer ticker.Stop()
    for {
        select {
        case <-ticker.C:
            conn.SendData("ping")
        case <-conn.Done():
            return
        }
    }
}()

func (*Conn) Send

func (c *Conn) Send(event *Event) error

Send sends an Event to the client.

Returns ErrConnectionClosed if the connection is already closed.

Example:

event := sse.NewEvent("User logged in").
    WithType("notification").
    WithID("evt-123")
err := conn.Send(event)

func (*Conn) SendData

func (c *Conn) SendData(data string) error

SendData sends a simple data-only event to the client.

This is a convenience method equivalent to Send(NewEvent(data)).

Returns ErrConnectionClosed if the connection is already closed.

Example:

err := conn.SendData("Hello, World!")

func (*Conn) SendJSON

func (c *Conn) SendJSON(v any) error

SendJSON sends a JSON-encoded event to the client.

The value is marshaled to JSON using encoding/json/v2. If marshaling fails, the error is returned.

Returns ErrConnectionClosed if the connection is already closed.

Example:

user := map[string]string{"name": "Alice", "status": "online"}
err := conn.SendJSON(user)

type Event

type Event struct {
	// Type is the event type (optional).
	// If empty, client receives generic "message" event.
	// Maps to "event:" field in SSE format.
	Type string

	// ID is the event ID for Last-Event-ID tracking (optional).
	// Client sends this in reconnect via Last-Event-ID header.
	// Maps to "id:" field in SSE format.
	ID string

	// Data is the event data (required).
	// Can be single line or multi-line.
	// Maps to "data:" field(s) in SSE format.
	Data string

	// Retry is the reconnection time in milliseconds (optional).
	// Tells client how long to wait before reconnecting on disconnect.
	// Maps to "retry:" field in SSE format.
	Retry int
}

Event represents a Server-Sent Event.

An Event consists of optional type, ID, retry fields, and required data field. Events are serialized according to the text/event-stream specification.

Example:

event := sse.NewEvent("Hello, World!").
    WithType("greeting").
    WithID("msg-1").
    WithRetry(3000)

fmt.Println(event.String())
// Output:
// event: greeting
// id: msg-1
// retry: 3000
// data: Hello, World!
//

func NewEvent

func NewEvent(data string) *Event

NewEvent creates a new Event with the specified data.

The returned Event can be further customized using builder methods: WithType, WithID, WithRetry.

Example:

event := sse.NewEvent("User logged in")

func (*Event) String

func (e *Event) String() string

String serializes the Event to SSE text/event-stream format.

The format follows the SSE specification:

  • Each field starts with field name + colon + space
  • Multi-line data becomes multiple "data:" lines
  • Message ends with double newline (\n\n)

Example:

event := sse.NewEvent("line1\nline2").WithType("multiline")
fmt.Println(event.String())
// Output:
// event: multiline
// data: line1
// data: line2
//

func (*Event) WithID

func (e *Event) WithID(id string) *Event

WithID sets the event ID.

This is used for client reconnection tracking via Last-Event-ID header.

Example:

event := sse.NewEvent("data").WithID("msg-123")

func (*Event) WithRetry

func (e *Event) WithRetry(ms int) *Event

WithRetry sets the reconnection retry time in milliseconds.

Example:

event := sse.NewEvent("data").WithRetry(3000) // 3 seconds

func (*Event) WithType

func (e *Event) WithType(typ string) *Event

WithType sets the event type.

Example:

event := sse.NewEvent("data").WithType("notification")

type Hub

type Hub[T any] struct {
	// contains filtered or unexported fields
}

Hub manages broadcasting events to multiple SSE connections.

Hub[T] is a generic type that manages a pool of SSE connections and enables efficient broadcasting of typed events to all connected clients. It handles client registration, unregistration, and automatic cleanup of failed connections.

Example:

hub := sse.NewHub[string]()
go hub.Run()
defer hub.Close()

// Register connections
hub.Register(conn1)
hub.Register(conn2)

// Broadcast to all clients
hub.Broadcast("Hello, everyone!")

The Hub uses channels for thread-safe coordination and a select loop in Run() to handle concurrent registration, unregistration, and broadcasting operations.

func NewHub

func NewHub[T any]() *Hub[T]

NewHub creates a new Hub for broadcasting events of type T.

The returned Hub must be started by calling Run() in a goroutine before use. Always call Close() when done to properly shut down the hub.

Example:

hub := sse.NewHub[UserEvent]()
go hub.Run()
defer hub.Close()

func (*Hub[T]) Broadcast

func (h *Hub[T]) Broadcast(data T) error

Broadcast sends data to all connected clients.

The data will be converted to a string representation:

  • string: sent as-is
  • fmt.Stringer: String() method called
  • other types: JSON-encoded

Failed sends automatically remove the client from the hub.

Returns ErrHubClosed if the hub is already closed.

Example:

err := hub.Broadcast("Server restarting in 5 minutes")

func (*Hub[T]) BroadcastJSON

func (h *Hub[T]) BroadcastJSON(v any) error

BroadcastJSON sends a JSON-encoded value to all connected clients.

This is a convenience method for sending structured data. The value is marshaled to JSON and sent to all clients.

Returns an error if JSON marshaling fails or if the hub is closed.

Example:

event := UserEvent{ID: 1, Action: "login"}
err := hub.BroadcastJSON(event)

func (*Hub[T]) Clients

func (h *Hub[T]) Clients() int

Clients returns the number of currently connected clients.

This is safe to call concurrently with other Hub operations.

Example:

count := hub.Clients()
log.Printf("Active connections: %d", count)

func (*Hub[T]) Close

func (h *Hub[T]) Close() error

Close shuts down the hub and closes all client connections.

After Close, all operations on the hub will return ErrHubClosed. It's safe to call Close multiple times.

Example:

defer hub.Close()

func (*Hub[T]) Register

func (h *Hub[T]) Register(conn *Conn) error

Register adds a connection to the hub.

The connection will receive all future broadcasts until it's unregistered or fails to send.

Returns ErrHubClosed if the hub is already closed.

Example:

conn, err := sse.Upgrade(w, r)
if err != nil {
    return err
}
err = hub.Register(conn)

func (*Hub[T]) Run

func (h *Hub[T]) Run()

Run starts the hub's event loop.

Run processes client registration, unregistration, and broadcast operations. It should be called in a goroutine and will block until Close() is called.

Example:

hub := sse.NewHub[string]()
go hub.Run()

func (*Hub[T]) Unregister

func (h *Hub[T]) Unregister(conn *Conn) error

Unregister removes a connection from the hub.

The connection will be closed and removed from the broadcast list. It's safe to call Unregister multiple times for the same connection.

Returns ErrHubClosed if the hub is already closed.

Example:

err := hub.Unregister(conn)

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL