sse

package module
v0.0.0-...-a3d6be6 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: May 21, 2026 License: MIT Imports: 10 Imported by: 0

README

SSE

Go Report Card Go Reference

SSE is a minimal, zero-magic Server-Sent Events library for Go. It separates the protocol formatting from the HTTP transport, ensuring zero-allocation writes and explicit connection lifecycles. Built strictly on the standard library, it favors mechanical sympathy and composability over framework-like conveniences.

  • Zero-allocation writes — formats directly into the HTTP buffer without intermediate strings
  • Explicit concurrency — background heartbeats are context-aware and shut down cleanly
  • Standard library native — composes directly with io.Reader and http.ResponseWriter
  • Zero dependencies

Install

go get lowbit.dev/sse

An Event

An Event is a plain data struct:

type Event struct {
    Name       string
    ID         string
    Retry      time.Duration
    Data       string
    Extensions map[string]string
}

It holds pure data. The library does not assume your payload is JSON or any other format. If you need to send structured data, marshal it before assignment.

Usage

Server

Wrap an http.ResponseWriter with an Emitter to safely manage concurrent writes and flushes. Pass the request context to the background heartbeat so it cleans up automatically when the client disconnects.

func StreamHandler(w http.ResponseWriter, r *http.Request) {
    emitter, err := sse.NewEmitter(w)
    if err != nil {
        http.Error(w, "Streaming unsupported", http.StatusInternalServerError)
        return
    }

    // Keep the connection alive; automatically stops when r.Context() is canceled
    go emitter.ServeHeartbeats(r.Context(), 15*time.Second)

    ticker := time.NewTicker(1 * time.Second)
    defer ticker.Stop()

    for {
        select {
        case <-r.Context().Done():
            return // Client disconnected
            
        case <-ticker.C:
            _, err := emitter.Emit(sse.Event{
                Name: "ping",
                Data: "pong",
            })
            if err != nil {
                return // Socket dead
            }
        }
    }
}

Client

Pass any io.Reader (like an http.Response.Body) to the Reader. The parsing loop is entirely in your control, allowing you to handle timeouts and cancellations via standard HTTP client contexts.

req, _ := http.NewRequestWithContext(ctx, "GET", "http://example.com/stream", nil)
resp, err := http.DefaultClient.Do(req)
if err != nil {
    log.Fatal(err)
}
defer resp.Body.Close()

reader := sse.NewReader(resp.Body)

for {
    event, err := reader.Read()
    if err != nil {
        if errors.Is(err, io.EOF) {
            // Clean disconnect
            break
        }
        // Handle error
        break
    }
    
    fmt.Printf("Event: %s, Data: %s\n", event.Name, event.Data)
}

Core Types

sse splits responsibilities to avoid massive heap allocations and to keep connection state explicitly separated from pure data.

Type Role
Emitter Safely manages concurrent writes, flushes, and background heartbeats for an http.ResponseWriter.
Writer Handles the zero-allocation protocol formatting to any underlying io.Writer.
Reader Wraps an io.Reader for allocation-efficient event parsing.
Event The raw data container mapping directly to the SSE specification.

Errors

Error Effect
ErrStreamingUnsupported Returned by NewEmitter if the provided http.ResponseWriter does not implement http.Flusher.
io.EOF Returned by Reader.Read() when the server sends a clean termination (FIN packet).

Both errors behave predictably. If Emitter.Emit() fails, it will return the standard library's network error (e.g., broken pipe), indicating a dead connection that should be dropped.

Documentation

Overview

Example

Example demonstrates basic usage of the sse package: writing and reading an event.

package main

import (
	"bytes"
	"fmt"
	"time"

	"lowbit.dev/sse"
)

func main() {
	buf := &bytes.Buffer{}
	w := sse.NewWriter(buf)
	w.Write(sse.Event{
		ID:         "42",
		Name:       "message",
		Data:       "hello world",
		Retry:      2 * time.Second,
		Extensions: map[string]string{"foo": "bar"},
	})

	r := sse.NewReader(buf)
	evt, err := r.Read()
	if err != nil {
		panic(err)
	}

	fmt.Println(evt.ID, evt.Name, evt.Data, evt.Retry, evt.Extensions["foo"])
}
Output:
42 message hello world 2s bar

Index

Examples

Constants

This section is empty.

Variables

View Source
var ErrStreamingUnsupported = errors.New("streaming unsupported by client")

Functions

This section is empty.

Types

type Emitter

type Emitter struct {
	// contains filtered or unexported fields
}
Example

ExampleEmitter demonstrates using Emitter to send events and heartbeats.

package main

import (
	"fmt"
	"net/http/httptest"

	"lowbit.dev/sse"
)

func main() {
	rw := httptest.NewRecorder()
	emitter, err := sse.NewEmitter(rw)
	if err != nil {
		panic(err)
	}

	emitter.Emit(sse.Event{ID: "1", Name: "tick", Data: "tock"})
	emitter.WriteHeartbeat()
	fmt.Print(rw.Body.String())
}
Output:
id: 1
event: tick
data: tock

:

func NewEmitter

func NewEmitter(w http.ResponseWriter) (*Emitter, error)

func (*Emitter) Emit

func (e *Emitter) Emit(event Event) (int, error)

func (*Emitter) ServeHeartbeats

func (e *Emitter) ServeHeartbeats(ctx context.Context, interval time.Duration)

ServeHeartbeats writes a heartbeat comment to the stream at the specified interval to prevent intermediate proxies from dropping the connection.

This method is synchronous and blocks indefinitely until the provided context is canceled or a network write fails. Callers should execute it in a separate goroutine. It guarantees a clean shutdown when the request context (req.Context()) ends, ensuring no resources are leaked.

Example

ExampleEmitter_ServeHeartbeats demonstrates ServeHeartbeats with context cancellation.

package main

import (
	"context"
	"fmt"
	"net/http/httptest"
	"strings"
	"time"

	"lowbit.dev/sse"
)

func main() {
	rw := httptest.NewRecorder()
	emitter, _ := sse.NewEmitter(rw)

	ctx, cancel := context.WithTimeout(context.Background(), 10*time.Millisecond)
	defer cancel()

	go emitter.ServeHeartbeats(ctx, 5*time.Millisecond)

	time.Sleep(15 * time.Millisecond)
	out := rw.Body.String()
	fmt.Print(strings.Count(out, ":\n\n")) // count heartbeats
}
Output:
2

func (*Emitter) WriteHeartbeat

func (e *Emitter) WriteHeartbeat() error

WriteHeartbeat sends a manual SSE comment to keep the connection alive.

func (*Emitter) WriteRetry

func (e *Emitter) WriteRetry(t time.Duration) (int, error)

WriteRetry sends a retry message to set the reconnect wait time for the client

type Event

type Event struct {
	Name       string
	ID         string
	Retry      time.Duration
	Data       string
	Extensions map[string]string
}

Event represents a single SSE event.

Example

ExampleEvent demonstrates the Event struct.

package main

import (
	"fmt"
	"time"

	"lowbit.dev/sse"
)

func main() {
	e := sse.Event{
		ID:         "abc",
		Name:       "notice",
		Data:       "payload",
		Retry:      500 * time.Millisecond,
		Extensions: map[string]string{"x": "1"},
	}

	fmt.Println(e.ID, e.Name, e.Data, e.Retry, e.Extensions["x"])
}
Output:
abc notice payload 500ms 1

type Reader

type Reader struct {
	// contains filtered or unexported fields
}

Reader reads SSE events from an underlying io.Reader.

Example

ExampleReader demonstrates reading an SSE event from a stream.

package main

import (
	"fmt"
	"strings"

	"lowbit.dev/sse"
)

func main() {
	data := "id: 7\nevent: ping\ndata: pong\nretry: 1000\nfoo: bar\n\n"
	r := sse.NewReader(strings.NewReader(data))

	evt, err := r.Read()
	if err != nil {
		panic(err)
	}

	fmt.Println(evt.ID, evt.Name, evt.Data, evt.Retry, evt.Extensions["foo"])
}
Output:
7 ping pong 1s bar

func NewReader

func NewReader(r io.Reader) *Reader

NewReader initializes a new SSE reader. It wraps the provided io.Reader in a bufio.Reader for efficient line-by-line scanning.

func (*Reader) Read

func (r *Reader) Read() (Event, error)

Read blocks until a complete Event is parsed from the stream, or an error occurs. It handles heartbeats (comments) internally by ignoring them, ensuring the caller only receives actionable events.

type Writer

type Writer struct {
	// contains filtered or unexported fields
}
Example

ExampleWriter demonstrates writing an SSE event to a stream.

package main

import (
	"bytes"
	"fmt"
	"time"

	"lowbit.dev/sse"
)

func main() {
	buf := &bytes.Buffer{}
	w := sse.NewWriter(buf)

	w.Write(sse.Event{
		ID:         "99",
		Name:       "update",
		Data:       "done",
		Retry:      3 * time.Second,
		Extensions: map[string]string{"extra": "val"},
	})

	fmt.Print(buf.String())
}
Output:
id: 99
event: update
retry: 3000
extra: val
data: done

func NewWriter

func NewWriter(w io.Writer) *Writer

func (*Writer) Write

func (w *Writer) Write(e Event) (int, error)

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL