sse

package module
v0.1.6 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Dec 6, 2025 License: Apache-2.0 Imports: 11 Imported by: 3

README ΒΆ

β–‘β–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ•—β–‘β–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ•—β–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ•—
β–ˆβ–ˆβ•”β•β•β•β•β•β–ˆβ–ˆβ•”β•β•β•β•β•β–ˆβ–ˆβ•”β•β•β•β•β•
β•šβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ•—β–‘β•šβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ•—β–‘β–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ•—β–‘β–‘
β–‘β•šβ•β•β•β–ˆβ–ˆβ•—β–‘β•šβ•β•β•β–ˆβ–ˆβ•—β–ˆβ–ˆβ•”β•β•β•β–‘β–‘
β–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ•”β•β–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ•”β•β–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ•—
β•šβ•β•β•β•β•β•β–‘β•šβ•β•β•β•β•β•β–‘β•šβ•β•β•β•β•β•β•

Go Reference Go Report Card

A simple, optimized, and high-performance Server-Sent Events (SSE) client and server library for Go.

✨ Features

  • πŸš€ High Performance - Optimized memory usage with buffer pooling and zero-copy operations
  • πŸ”„ Automatic Ping - Keep connections alive with configurable ping timeouts
  • 🧹 Memory Efficient - Small memory footprint with automatic buffer recycling
  • 🌐 CORS Ready - Built-in CORS support for web applications
  • πŸ›‘οΈ Thread Safe - Concurrent-safe operations with atomic operations
  • πŸ“¦ Zero Dependencies - Pure Go implementation with no external dependencies
  • ⚑ Fast Parsing - Optimized SSE message parsing with minimal allocations

πŸ“¦ Installation

go get ella.to/sse

πŸš€ Quick Start

Server Example

package main

import (
    "fmt"
    "log"
    "net/http"
    "strconv"
    "time"

    "ella.to/sse"
)

func sseHandler(w http.ResponseWriter, r *http.Request) {
    // Create pusher with 30-second ping timeout
    // Set to 0 to disable automatic ping messages
    pusher, err := sse.NewHttpPusher(w, 30*time.Second)
    if err != nil {
        http.Error(w, "SSE not supported", http.StatusInternalServerError)
        return
    }
    defer pusher.Close()

    // Send 10 messages with 1-second intervals
    for i := 1; i <= 10; i++ {
        msg := sse.NewMessage(
            strconv.Itoa(i),                    // ID
            "notification",                     // Event type
            fmt.Sprintf("Message %d", i),       // Data
        )

        if err := pusher.Push(msg); err != nil {
            log.Printf("Error pushing message: %v", err)
            return
        }

        // Return message to pool for reuse
        sse.PutMessage(msg)
        
        time.Sleep(1 * time.Second)
    }
}

func main() {
    http.HandleFunc("/events", sseHandler)
    
    log.Println("SSE server starting on :8080")
    log.Println("Test with: curl -N http://localhost:8080/events")
    log.Fatal(http.ListenAndServe(":8080", nil))
}

Client Example

package main

import (
    "context"
    "fmt"
    "log"
    "net/http"
    "time"

    "ella.to/sse"
)

func main() {
    client := &http.Client{
        Timeout: 30 * time.Second,
    }

    req, err := http.NewRequest("GET", "http://localhost:8080/events", nil)
    if err != nil {
        log.Fatal(err)
    }

    resp, err := client.Do(req)
    if err != nil {
        log.Fatal(err)
    }
    defer resp.Body.Close()

    receiver := sse.NewReceiver(resp.Body)

    // Use context with timeout for graceful shutdown
    ctx, cancel := context.WithTimeout(context.Background(), 60*time.Second)
    defer cancel()

    fmt.Println("Listening for SSE messages...")
    for {
        msg, err := receiver.Receive(ctx)
        if err != nil {
            log.Printf("Connection closed: %v", err)
            break
        }

        fmt.Printf("ID: %s, Event: %s, Data: %s\n", 
            msg.Id, msg.Event, msg.Data)
    }
}

πŸ”§ Advanced Usage

Custom Message Creation

// Method 1: Using constructor
msg := sse.NewMessage("msg-1", "update", "Hello World")

// Method 2: Using message pool (recommended for high-throughput)
msg := sse.GetMessage()
msg.SetMessage("msg-1", "update", "Hello World")
// Don't forget to return to pool when done
defer sse.PutMessage(msg)

// Method 3: Manual creation
msg := &sse.Message{
    Id:    "msg-1",
    Event: "update", 
    Data:  "Hello World",
}

Broadcasting to Multiple Clients

type Hub struct {
    clients map[sse.Pusher]bool
    mu      sync.RWMutex
}

func (h *Hub) AddClient(pusher sse.Pusher) {
    h.mu.Lock()
    h.clients[pusher] = true
    h.mu.Unlock()
}

func (h *Hub) RemoveClient(pusher sse.Pusher) {
    h.mu.Lock()
    delete(h.clients, pusher)
    h.mu.Unlock()
}

func (h *Hub) Broadcast(msg *sse.Message) {
    h.mu.RLock()
    defer h.mu.RUnlock()
    
    for client := range h.clients {
        if err := client.Push(msg); err != nil {
            // Handle client disconnection
            go h.RemoveClient(client)
        }
    }
}

Parsing Raw SSE Data

import "strings"

func parseSSEData(data string) {
    reader := strings.NewReader(data)
    ch := sse.Parse(reader)
    
    for msg := range ch {
        fmt.Printf("Received: %+v\n", msg)
        // Remember to return pooled messages
        sse.PutMessage(msg)
    }
}

πŸ“Š Performance

This library is optimized for high-performance scenarios:

  • Memory Pooling: Automatic reuse of message objects and buffers
  • Zero-Copy Operations: Minimal memory allocations during parsing
  • Concurrent-Safe: Lock-free operations where possible using atomic operations
  • Efficient Parsing: Optimized SSE protocol parsing with bufio.Scanner

Benchmarks

BenchmarkPushReceive-8        1000    1.2 ms/op     245 B/op    12 allocs/op
BenchmarkParseMessages-8      5000    0.3 ms/op      89 B/op     5 allocs/op
BenchmarkHighThroughput-8      100   15.4 ms/op    1024 B/op    45 allocs/op

πŸ› οΈ Configuration Options

Ping Timeout

// 30-second ping timeout (recommended)
pusher, _ := sse.NewHttpPusher(w, 30*time.Second)

// Disable ping messages
pusher, _ := sse.NewHttpPusher(w, 0)

CORS Headers

The library automatically sets appropriate CORS headers:

  • Access-Control-Allow-Origin: *
  • Access-Control-Allow-Headers: Cache-Control
  • Connection: keep-alive
  • Cache-Control: no-cache

πŸ“ Message Format

SSE messages follow the standard format:

id: message-id
event: event-type
data: message data

Comments (lines starting with :) are ignored by the parser.

πŸ“„ License

This project is licensed under the MIT License - see the LICENSE file for details.

Documentation ΒΆ

Index ΒΆ

Constants ΒΆ

View Source
const (
	Version = "0.1.2"
)

Variables ΒΆ

This section is empty.

Functions ΒΆ

func BatchParse ΒΆ added in v0.1.0

func BatchParse(r io.Reader, batchSize int) <-chan []*Message

BatchParse processes multiple messages in batches to reduce channel overhead

func FastParse ΒΆ added in v0.1.0

func FastParse(r io.Reader) <-chan *Message

FastParse provides an even more optimized parsing implementation that reduces allocations further by reusing more components

func NewHttpReceiver ΒΆ added in v0.1.3

func NewHttpReceiver(url string, opts ...interface{}) (*httpReceiver, error)

func NewRetryClient ΒΆ added in v0.1.3

func NewRetryClient(opts ...retryTransportOpt) (*http.Client, error)

NewRetryClient creates an HTTP client with retry logic and header injection

func Parse ΒΆ added in v0.0.4

func Parse(r io.Reader) <-chan *Message

func PutMessage ΒΆ added in v0.1.0

func PutMessage(msg *Message)

PutMessage returns a message to the pool after resetting it

func SetLogger ΒΆ added in v0.1.6

func SetLogger(l *slog.Logger)

func WithConnectionInitialDelay ΒΆ added in v0.1.3

func WithConnectionInitialDelay(delay time.Duration) httpReceiverOpt

func WithConnectionMaxDelay ΒΆ added in v0.1.3

func WithConnectionMaxDelay(delay time.Duration) httpReceiverOpt

func WithConnectionMaxRetries ΒΆ added in v0.1.3

func WithConnectionMaxRetries(maxRetries int) httpReceiverOpt

func WithHeaders ΒΆ added in v0.1.3

func WithHeaders(headers map[string]string) retryTransportOpt

func WithInitialDelay ΒΆ added in v0.1.3

func WithInitialDelay(delay time.Duration) retryTransportOpt

func WithMaxDelay ΒΆ added in v0.1.3

func WithMaxDelay(delay time.Duration) retryTransportOpt

func WithMaxRetries ΒΆ added in v0.1.3

func WithMaxRetries(maxRetries int) retryTransportOpt

Types ΒΆ

type Message ΒΆ added in v0.0.4

type Message struct {
	Id    string
	Event string
	Data  string
	// contains filtered or unexported fields
}

func GetMessage ΒΆ added in v0.1.0

func GetMessage() *Message

GetMessage gets a message from the pool

func NewMessage ΒΆ added in v0.0.8

func NewMessage(id, event, data string) *Message

func NewPingEvent ΒΆ added in v0.0.8

func NewPingEvent() *Message

func (*Message) Read ΒΆ added in v0.0.8

func (m *Message) Read(b []byte) (int, error)

func (*Message) Reset ΒΆ added in v0.1.0

func (m *Message) Reset()

Reset clears the message for reuse

func (*Message) SetMessage ΒΆ added in v0.1.0

func (m *Message) SetMessage(id, event, data string)

SetMessage sets all fields at once for efficient reuse

func (*Message) String ΒΆ added in v0.0.4

func (m *Message) String() string

func (*Message) Write ΒΆ added in v0.0.8

func (m *Message) Write(b []byte) (int, error)

type Pusher ΒΆ

type Pusher interface {
	Push(msg *Message) error
	Close() error
}

func NewHttpPusher ΒΆ added in v0.0.8

func NewHttpPusher(w http.ResponseWriter, timeout time.Duration) (Pusher, error)

func NewPushCloser ΒΆ added in v0.0.8

func NewPushCloser(push func(msg *Message) error, close func() error) Pusher

func NewPusher ΒΆ added in v0.0.4

func NewPusher(w io.Writer, timeout time.Duration) (Pusher, error)

type Receiver ΒΆ added in v0.0.4

type Receiver interface {
	Receive(ctx context.Context) (*Message, error)
}

func NewReceiver ΒΆ added in v0.0.4

func NewReceiver(rc io.Reader) Receiver

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL