streaming

package
v0.1.5 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Oct 28, 2025 License: MIT Imports: 11 Imported by: 0

README

Streaming Extension

Real-time streaming extension for Forge v2 with WebSocket/SSE support, rooms, channels, presence tracking, typing indicators, and distributed coordination.

Features

  • WebSocket & SSE - Built on top of Forge's core router streaming support
  • Rooms - Create chat rooms with members, roles, and permissions
  • Channels - Pub/sub channels with filters and subscriptions
  • Presence - Track online/offline/away status across users
  • Typing Indicators - Real-time typing indicators per room
  • Message History - Persist and retrieve message history
  • Distributed - Redis/NATS backends for multi-node deployments
  • Interface-First - All major components are interfaces for testability

Installation

import "github.com/xraph/forge/extensions/streaming"

Quick Start

Basic Setup (Local Backend)
package main

import (
    "github.com/xraph/forge"
    "github.com/xraph/forge/extensions/streaming"
)

func main() {
    // Create container and app
    container := forge.NewContainer()
    app := forge.NewApp(container)
    router := forge.NewRouter(forge.WithContainer(container))

    // Create streaming extension with local backend
    streamExt := streaming.NewExtension(
        streaming.WithLocalBackend(),
        streaming.WithFeatures(true, true, true, true, true), // rooms, channels, presence, typing, history
    )

    // Register and start
    app.Use(streamExt)
    app.Start(context.Background())

    // Register streaming routes
    streamExt.RegisterRoutes(router, "/ws", "/sse")

    // Start server
    http.ListenAndServe(":8080", router)
}
With Redis Backend (Distributed)
streamExt := streaming.NewExtension(
    streaming.WithRedisBackend("redis://localhost:6379"),
    streaming.WithFeatures(true, true, true, true, true),
    streaming.WithNodeID("node-1"), // Optional, auto-generated if not set
)

Architecture

Interface-First Design

All major components are defined as interfaces with multiple implementations:

streaming (interfaces)
├── Manager              - Central orchestrator
├── EnhancedConnection   - WebSocket connection with metadata
├── Room                 - Room management
├── RoomStore           - Room persistence backend
├── Channel             - Pub/sub channel
├── ChannelStore        - Channel persistence backend
├── PresenceTracker     - Presence tracking
├── PresenceStore       - Presence persistence backend
├── TypingTracker       - Typing indicators
├── TypingStore         - Typing persistence backend
├── MessageStore        - Message history
└── DistributedBackend  - Cross-node coordination
Package Structure
v2/extensions/streaming/
├── streaming.go         # Core interfaces (Manager, EnhancedConnection)
├── room.go             # Room interfaces
├── channel.go          # Channel interfaces
├── presence.go         # Presence interfaces
├── typing.go           # Typing interfaces
├── persistence.go      # Message store interfaces
├── distributed.go      # Distributed backend interfaces
├── config.go           # Configuration
├── errors.go           # Domain errors
├── manager.go          # Manager implementation
├── connection.go       # Enhanced connection implementation
├── extension.go        # Extension entry point
├── backends/
│   ├── factory.go      # Store factory
│   ├── local/          # In-memory implementations
│   ├── redis/          # Redis implementations (TODO)
│   └── nats/           # NATS implementations (TODO)
└── trackers/
    ├── presence_tracker.go  # Presence tracker implementation
    └── typing_tracker.go    # Typing tracker implementation

Usage Examples

Custom WebSocket Handler
router.WebSocket("/chat", func(ctx forge.Context, conn forge.Connection) error {
    // Get streaming manager from DI
    var manager streaming.Manager
    ctx.Container().Resolve(&manager)

    // Get user from auth
    userID := ctx.Get("user_id").(string)

    // Create enhanced connection
    enhanced := streaming.NewEnhancedConnection(conn)
    enhanced.SetUserID(userID)
    enhanced.SetSessionID(uuid.New().String())

    // Register
    manager.Register(enhanced)
    defer manager.Unregister(conn.ID())

    // Set online
    manager.SetPresence(ctx.Request().Context(), userID, streaming.StatusOnline)
    defer manager.SetPresence(ctx.Request().Context(), userID, streaming.StatusOffline)

    // Message loop
    for {
        var msg streaming.Message
        if err := conn.ReadJSON(&msg); err != nil {
            return err
        }

        // Handle message
        switch msg.Type {
        case streaming.MessageTypeMessage:
            if msg.RoomID != "" {
                manager.BroadcastToRoom(ctx.Request().Context(), msg.RoomID, &msg)
            }
        case streaming.MessageTypeJoin:
            manager.JoinRoom(ctx.Request().Context(), conn.ID(), msg.RoomID)
        case streaming.MessageTypeLeave:
            manager.LeaveRoom(ctx.Request().Context(), conn.ID(), msg.RoomID)
        }
    }
})
Room Management REST API
api := router.Group("/api/v1")

// Create room
api.POST("/rooms", func(ctx forge.Context, req *CreateRoomRequest) error {
    var manager streaming.Manager
    ctx.Container().Resolve(&manager)

    userID := ctx.Get("user_id").(string)

    room := streaming.RoomOptions{
        ID:          uuid.New().String(),
        Name:        req.Name,
        Description: req.Description,
        Owner:       userID,
    }

    if err := manager.CreateRoom(ctx.Request().Context(), room); err != nil {
        return err
    }

    return ctx.JSON(200, room)
})

// Get room history
api.GET("/rooms/:id/history", func(ctx forge.Context) error {
    var manager streaming.Manager
    ctx.Container().Resolve(&manager)

    roomID := ctx.Param("id")

    messages, err := manager.GetHistory(ctx.Request().Context(), roomID, streaming.HistoryQuery{
        Limit: 100,
    })
    if err != nil {
        return err
    }

    return ctx.JSON(200, messages)
})

Configuration

Complete Configuration Example
streamExt := streaming.NewExtension(
    // Backend
    streaming.WithBackend("redis"),
    streaming.WithBackendURLs("redis://localhost:6379"),
    streaming.WithAuthentication("username", "password"),

    // Features
    streaming.WithFeatures(true, true, true, true, true),

    // Limits
    streaming.WithConnectionLimits(5, 50, 100), // conns/user, rooms/user, channels/user
    streaming.WithMessageLimits(64*1024, 100),  // max size, max/second

    // Timeouts
    streaming.WithTimeouts(30*time.Second, 10*time.Second, 10*time.Second), // ping, pong, write

    // Retention
    streaming.WithMessageRetention(30 * 24 * time.Hour), // 30 days

    // Distributed
    streaming.WithNodeID("node-1"),

    // TLS
    streaming.WithTLS("cert.pem", "key.pem", "ca.pem"),
)
Configuration from File
# config.yaml
extensions:
  streaming:
    backend: redis
    backend_urls:
      - redis://localhost:6379
    enable_rooms: true
    enable_channels: true
    enable_presence: true
    enable_typing_indicators: true
    enable_message_history: true
    max_connections_per_user: 5
    max_rooms_per_user: 50
    max_message_size: 65536
    message_retention: 720h # 30 days
// Automatically loads from config
streamExt := streaming.NewExtension()

Message Protocol

Message Structure
type Message struct {
    ID        string         `json:"id"`
    Type      string         `json:"type"`      // "message", "presence", "typing", "system"
    Event     string         `json:"event,omitempty"`
    RoomID    string         `json:"room_id,omitempty"`
    ChannelID string         `json:"channel_id,omitempty"`
    UserID    string         `json:"user_id"`
    Data      any            `json:"data"`
    Metadata  map[string]any `json:"metadata,omitempty"`
    Timestamp time.Time      `json:"timestamp"`
    ThreadID  string         `json:"thread_id,omitempty"`
}
Message Types
  • MessageTypeMessage - Regular chat message
  • MessageTypePresence - Presence update (online/offline/away)
  • MessageTypeTyping - Typing indicator
  • MessageTypeSystem - System notification
  • MessageTypeJoin - User joined room
  • MessageTypeLeave - User left room
  • MessageTypeError - Error message

Backend Comparison

Feature Local Redis NATS
Single Node
Multi-Node
Persistence Memory Disk Disk
Message History Limited Full Full
Presence Sync
Performance Fastest Fast Fastest
Setup None Redis NATS Server

Production Considerations

Scaling

For distributed deployments:

  1. Use Redis or NATS backend
  2. Set unique node IDs per instance
  3. Configure proper timeouts and limits
  4. Enable message persistence
  5. Monitor metrics
Security
  • Always use authentication middleware before WebSocket routes
  • Validate user permissions for room/channel access
  • Rate limit connections per user
  • Use TLS in production
  • Never log sensitive message content
Monitoring

Key metrics to monitor:

  • streaming.connections.active - Active connections
  • streaming.connections.total - Total connections created
  • streaming.messages.broadcast - Messages broadcast
  • streaming.rooms.joins - Room joins
  • streaming.presence.updates - Presence updates
Health Checks
// Extension provides health check
if err := streamExt.Health(ctx); err != nil {
    log.Error("streaming unhealthy", err)
}

Testing

Mock Implementations

All interfaces can be easily mocked for testing:

type mockManager struct {
    streaming.Manager
    registerCalls int
}

func (m *mockManager) Register(conn streaming.EnhancedConnection) error {
    m.registerCalls++
    return nil
}

func TestMyHandler(t *testing.T) {
    manager := &mockManager{}
    // Test with mock manager
}
Integration Tests
// Use local backend for tests
streamExt := streaming.NewExtension(streaming.WithLocalBackend())
// Run tests against real implementation

Roadmap

  • Redis backend implementation
  • NATS backend implementation
  • Message compression for old messages
  • Advanced filtering and search
  • WebRTC signaling support
  • GraphQL subscriptions integration
  • Admin dashboard for monitoring

License

Part of Forge v2 framework.

Documentation

Index

Constants

View Source
const MessageTypeJoin = internal.MessageTypeJoin
View Source
const MessageTypeLeave = internal.MessageTypeLeave
View Source
const MessageTypeMessage = internal.MessageTypeMessage

Message type constants

View Source
const MessageTypePresence = internal.MessageTypePresence
View Source
const MessageTypeTyping = internal.MessageTypeTyping
View Source
const StatusAway = internal.StatusAway
View Source
const StatusBusy = internal.StatusBusy
View Source
const StatusOffline = internal.StatusOffline
View Source
const StatusOnline = internal.StatusOnline

Status constants

Variables

View Source
var DefaultPresenceOptions = internal.DefaultPresenceOptions

Default option functions

View Source
var DefaultTypingOptions = internal.DefaultTypingOptions
View Source
var ErrAlreadyRoomMember = internal.ErrAlreadyRoomMember
View Source
var ErrAlreadySubscribed = internal.ErrAlreadySubscribed
View Source
var ErrBackendNotFound = internal.ErrBackendNotConnected
View Source
var ErrBackendTimeout = internal.ErrBackendTimeout
View Source
var ErrBackendUnavailable = internal.ErrBackendUnavailable
View Source
var ErrChannelAlreadyExists = internal.ErrChannelAlreadyExists
View Source
var ErrChannelNotFound = internal.ErrChannelNotFound

Channel errors

View Source
var ErrConnectionClosed = internal.ErrConnectionClosed
View Source
var ErrConnectionLimitReached = internal.ErrConnectionLimitReached
View Source
var ErrConnectionNotFound = internal.ErrConnectionNotFound

Connection errors

View Source
var ErrInsufficientRole = internal.ErrInsufficientRole
View Source
var ErrInvalidChannel = internal.ErrInvalidChannel
View Source
var ErrInvalidConfig = internal.ErrInvalidConfig
View Source
var ErrInvalidConnection = internal.ErrInvalidConnection
View Source
var ErrInvalidMessage = internal.ErrInvalidMessage
View Source
var ErrInvalidPermission = internal.ErrInvalidPermission
View Source
var ErrInvalidRoom = internal.ErrInvalidRoom
View Source
var ErrInvalidStatus = internal.ErrInvalidStatus
View Source
var ErrInviteExpired = internal.ErrInviteExpired
View Source
var ErrInviteNotFound = internal.ErrInviteNotFound

Invite errors

View Source
var ErrLockAcquisitionFailed = internal.ErrLockAcquisitionFailed
View Source
var ErrLockNotHeld = internal.ErrLockNotHeld
View Source
var ErrMessageNotFound = internal.ErrMessageNotFound
View Source
var ErrMessageTooLarge = internal.ErrMessageTooLarge
View Source
var ErrNodeNotFound = internal.ErrNodeNotFound
View Source
var ErrNotRoomMember = internal.ErrNotRoomMember
View Source
var ErrNotSubscribed = internal.ErrNotSubscribed
View Source
var ErrPermissionDenied = internal.ErrPermissionDenied

Permission errors

View Source
var ErrPresenceNotFound = internal.ErrPresenceNotFound
View Source
var ErrRoomAlreadyExists = internal.ErrRoomAlreadyExists
View Source
var ErrRoomFull = internal.ErrRoomFull
View Source
var ErrRoomLimitReached = internal.ErrRoomLimitReached
View Source
var ErrRoomNotFound = internal.ErrRoomNotFound

Room errors

View Source
var NewBackendError = internal.NewBackendError
View Source
var NewChannelError = internal.NewChannelError
View Source
var NewConnectionError = internal.NewConnectionError

Error constructors

View Source
var NewMessageError = internal.NewMessageError
View Source
var NewRoomError = internal.NewRoomError

Functions

func NewExtension

func NewExtension(opts ...ConfigOption) forge.Extension

NewExtension creates a new streaming extension with functional options.

func NewExtensionWithConfig

func NewExtensionWithConfig(config Config) forge.Extension

NewExtensionWithConfig creates a new streaming extension with a complete config.

func NewLocalRoom

func NewLocalRoom(opts RoomOptions) *local.LocalRoom

Room creation

Types

type ActivityInfo

type ActivityInfo = internal.ActivityInfo

type AnalyticsEvent

type AnalyticsEvent = internal.AnalyticsEvent

type AnalyticsQuery

type AnalyticsQuery = internal.AnalyticsQuery

type AnalyticsResult

type AnalyticsResult = internal.AnalyticsResult

Analytics types

type Availability

type Availability = internal.Availability

type BackendError

type BackendError = internal.BackendError

Backend error

type Channel

type Channel = internal.Channel

type ChannelStore

type ChannelStore = internal.ChannelStore

type Config

type Config = internal.Config

Configuration

func DefaultConfig

func DefaultConfig() Config

type ConfigOption

type ConfigOption = internal.ConfigOption

func WithAuthentication

func WithAuthentication(username, password string) ConfigOption

func WithBackend

func WithBackend(backend string) ConfigOption

func WithBackendURLs

func WithBackendURLs(urls ...string) ConfigOption

func WithBufferSizes

func WithBufferSizes(read, write int) ConfigOption

func WithConfig

func WithConfig(config Config) ConfigOption

func WithConnectionLimits

func WithConnectionLimits(perUser, roomsPerUser, channelsPerUser int) ConfigOption

func WithFeatures

func WithFeatures(rooms, channels, presence, typing, history bool) ConfigOption

func WithLocalBackend

func WithLocalBackend() ConfigOption

func WithMessageLimits

func WithMessageLimits(maxSize, maxPerSecond int) ConfigOption

func WithMessageRetention

func WithMessageRetention(retention time.Duration) ConfigOption

func WithNATSBackend

func WithNATSBackend(urls ...string) ConfigOption

func WithNodeID

func WithNodeID(nodeID string) ConfigOption

func WithPresenceTimeout

func WithPresenceTimeout(timeout time.Duration) ConfigOption

func WithRedisBackend

func WithRedisBackend(url string) ConfigOption

func WithRequireConfig

func WithRequireConfig(require bool) ConfigOption

func WithTLS

func WithTLS(certFile, keyFile, caFile string) ConfigOption

func WithTimeouts

func WithTimeouts(ping, pong, write time.Duration) ConfigOption

func WithTypingTimeout

func WithTypingTimeout(timeout time.Duration) ConfigOption

type Connection

type Connection = internal.EnhancedConnection

func NewConnection

func NewConnection(conn forge.Connection) Connection

NewConnection creates a new enhanced connection.

type ConnectionInfo

type ConnectionInfo = internal.ConnectionInfo

Connection types

type DeviceInfo

type DeviceInfo = internal.DeviceInfo

type DistributedBackend

type DistributedBackend = internal.DistributedBackend

Distributed backend

type DistributedBackendOptions

type DistributedBackendOptions = internal.DistributedBackendOptions

type Extension

type Extension struct {
	*forge.BaseExtension
	// contains filtered or unexported fields
}

Extension implements forge.Extension for streaming functionality.

func (*Extension) AsyncAPISpec

func (e *Extension) AsyncAPISpec() *forge.AsyncAPISpec

AsyncAPISpec generates AsyncAPI 3.0.0 specification for the streaming extension This documents all streaming channels, operations, and message types

func (*Extension) Health

func (e *Extension) Health(ctx context.Context) error

Health checks if the streaming extension is healthy.

func (*Extension) Manager

func (e *Extension) Manager() Manager

Manager returns the streaming manager (for advanced usage).

func (*Extension) Register

func (e *Extension) Register(app forge.App) error

Register registers the streaming extension with the app.

func (*Extension) RegisterRoutes

func (e *Extension) RegisterRoutes(router forge.Router, wsPath, ssePath string) error

RegisterRoutes is a helper to register WebSocket and SSE routes with the router.

func (*Extension) Start

func (e *Extension) Start(ctx context.Context) error

Start starts the streaming extension.

func (*Extension) Stop

func (e *Extension) Stop(ctx context.Context) error

Stop stops the streaming extension.

type FileInfo

type FileInfo = internal.FileInfo

type FileQuery

type FileQuery = internal.FileQuery

type FileUpload

type FileUpload = internal.FileUpload

File types

type HistoryQuery

type HistoryQuery = internal.HistoryQuery

Query types

type Invite

type Invite = internal.Invite

Room types

type InviteOptions

type InviteOptions = internal.InviteOptions

type LocalRoom

type LocalRoom = *local.LocalRoom

Room creation (local backend)

type Lock

type Lock = internal.Lock

type Manager

type Manager = internal.Manager

Core interfaces

func NewManager

func NewManager(
	config Config,
	roomStore RoomStore,
	channelStore ChannelStore,
	messageStore MessageStore,
	presenceTracker PresenceTracker,
	typingTracker TypingTracker,
	distributed DistributedBackend,
	logger forge.Logger,
	metrics forge.Metrics,
) Manager

NewManager creates a new streaming manager.

type ManagerStats

type ManagerStats = internal.ManagerStats

type Member

type Member = internal.Member

type MemberOptions

type MemberOptions = internal.MemberOptions

type Message

type Message = internal.Message

Message types

type MessageEdit

type MessageEdit = internal.MessageEdit

type MessageHandler

type MessageHandler = internal.MessageHandler

type MessageReaction

type MessageReaction = internal.MessageReaction

type MessageSearchQuery

type MessageSearchQuery = internal.MessageSearchQuery

type MessageStore

type MessageStore = internal.MessageStore

type ModerationEvent

type ModerationEvent = internal.ModerationEvent

type ModerationStatus

type ModerationStatus = internal.ModerationStatus

Moderation types

type NodeChangeEvent

type NodeChangeEvent = internal.NodeChangeEvent

type NodeChangeHandler

type NodeChangeHandler = internal.NodeChangeHandler

type NodeInfo

type NodeInfo = internal.NodeInfo

type OnlineStats

type OnlineStats = internal.OnlineStats

type PresenceEvent

type PresenceEvent = internal.PresenceEvent

Presence types

type PresenceFilters

type PresenceFilters = internal.PresenceFilters

type PresenceOptions

type PresenceOptions = internal.PresenceOptions

type PresenceStore

type PresenceStore = internal.PresenceStore

type PresenceTracker

type PresenceTracker = internal.PresenceTracker

Tracker interfaces

type RateLimitStatus

type RateLimitStatus = internal.RateLimitStatus

Rate limiting

type Room

type Room = internal.Room

type RoomBan

type RoomBan = internal.RoomBan

type RoomEvent

type RoomEvent = internal.RoomEvent

type RoomOptions

type RoomOptions = internal.RoomOptions

type RoomStats

type RoomStats = internal.RoomStats

Statistics types

type RoomStore

type RoomStore = internal.RoomStore

Store interfaces

type TypingOptions

type TypingOptions = internal.TypingOptions

Typing

type TypingStore

type TypingStore = internal.TypingStore

type TypingTracker

type TypingTracker = internal.TypingTracker

type UserPresence

type UserPresence = internal.UserPresence

type UserPresenceStats

type UserPresenceStats = internal.UserPresenceStats

type UserStats

type UserStats = internal.UserStats

type WebhookConfig

type WebhookConfig = internal.WebhookConfig

Webhook types

Directories

Path Synopsis
examples
chat command

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL