realtime

package module
v0.1.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: May 14, 2026 License: MIT Imports: 22 Imported by: 0

README

English | 中文

realtime

A production-grade WebSocket real-time gateway framework for Go.

Implements http.Handler — embed it into any net/http server and get full WebSocket lifecycle management: authentication, message routing with middleware, online presence tracking, backpressure control, and graceful shutdown. Supports single-node and distributed multi-node deployments via a pluggable broker architecture.

Go Reference Go Report Card

Features

  • Drop-in integration — implements http.Handler, works with any Go HTTP server
  • Pluggable authenticationAuthenticator interface with built-in anonymous and static-token providers
  • Flexible routing — exact match, longest-prefix match, and fallback handlers
  • Middleware chain — composable message middleware with built-in recovery, timeout, and logging
  • Cross-node delivery — pluggable Broker interface with local (in-process) and RabbitMQ implementations
  • Online presence — pluggable Presence interface with in-memory and Redis (TTL-based) implementations
  • Backpressure control — configurable policies: close, drop-newest, drop-oldest
  • Graceful shutdown — orderly session drain, broker/presence cleanup
  • JSON codec — pluggable Codec interface, JSON by default

Quick Start

go get github.com/forerunner-space/realtime
Minimal Server
package main

import (
    "context"
    "log"
    "net/http"

    "github.com/forerunner-space/realtime"
)

func main() {
    rt := realtime.New()

    rt.Handle("chat", func(ctx context.Context, s *realtime.Session, msg *realtime.Message) error {
        return rt.Broadcast(ctx, "chat", msg.Data)
    })

    http.Handle("/ws", rt)
    log.Fatal(http.ListenAndServe(":8080", nil))
}
With Authentication
rt := realtime.New(
    realtime.WithAuthenticator(&realtime.StaticTokenAuthenticator{
        Token:   "my-secret-token",
        Identity: realtime.Identity{ID: "server", TenantID: "default"},
    }),
)
Distributed Multi-Node with RabbitMQ and Redis
import (
    "github.com/forerunner-space/realtime"
    "github.com/forerunner-space/realtime/broker"
    "github.com/forerunner-space/realtime/presence"
)

rt := realtime.New(
    realtime.WithBroker(broker.RabbitMQ(broker.RabbitMQOptions{
        URL: "amqp://guest:guest@localhost:5672/",
    })),
    realtime.WithPresence(presence.Redis(presence.RedisOptions{
        Addr: "localhost:6379",
    })),
)

if err := rt.Start(ctx); err != nil {
    log.Fatal(err)
}

Architecture

HTTP Request
    │
    ▼
ServeHTTP (origin check → connection limit → authenticate → WS upgrade)
    │
    ▼
Session created → registered in Hub + Presence
    │
    ▼
OnConnect hooks (failure rolls back registration)
    │
    ▼
Read Loop:  WS frame → Codec.Unmarshal → Middleware chain → Router → Handler
Write Loop: Outbound queue → WS write (with heartbeat pings)
    │
    ▼
On disconnect: unregister presence → remove from hub → OnDisconnect hooks

API

Core Methods
Method Description
New(options ...Option) *Realtime Create a gateway with single-node defaults
Start(ctx) error Subscribe to broker topics for distributed delivery
Shutdown(ctx) error Graceful shutdown
ServeHTTP(w, r) http.Handler — upgrade and run session
Messaging
Method Description
Broadcast(ctx, route, data) error Fan-out to all sessions on all nodes
SendToIdentity(ctx, tenantID, identityID, route, data) error Targeted delivery to one identity
IsOnline(ctx, tenantID, identityID) (bool, error) Check if identity has online sessions
GetIdentitySessions(ctx, tenantID, identityID) ([]SessionInfo, error) List online sessions
Routing & Hooks
Method Description
Handle(route, HandlerFunc) Exact route handler
HandlePrefix(prefix, HandlerFunc) Prefix route handler (longest prefix wins)
OnMessage(HandlerFunc) Fallback handler
Use(middleware ...Middleware) Append message middleware
OnConnect(func(ctx, *Session) error) Post-registration hook
OnDisconnect(DisconnectHandler) Post-cleanup hook
Session Methods
Method Description
Send(ctx, route, data) error Send a message to this session
Close(code, reason) error Graceful close
ID() string Session unique ID
Identity() Identity Authenticated identity
Closed() bool Whether session is closing
Options
realtime.WithConfig(config.Config)
realtime.WithAuthenticator(auth)
realtime.WithCodec(codec)
realtime.WithBroker(broker)
realtime.WithPresence(presence)
realtime.WithLogger(logger)
realtime.WithDebug(true)
realtime.WithHeartbeat(30 * time.Second)
realtime.WithSendQueueSize(256)
realtime.WithBackpressure(realtime.DropOldest)
realtime.WithReadLimit(1 << 20)     // 1 MB
realtime.WithWriteTimeout(10 * time.Second)
realtime.WithAuthTimeout(5 * time.Second)
realtime.WithMaxConnections(10000)
realtime.WithOriginPatterns("*.example.com")
Built-in Middleware
realtime.Recovery()          // panic → error
realtime.Timeout(5 * time.Second)
realtime.Logger()

Sub-Packages

Package Description
auth Authenticator constructors: Anonymous(), StaticToken(token, identity)
broker Broker constructors: Local(nodeID), RabbitMQ(opts)
codec Codec constructors: JSON()
presence Presence constructors: Memory(), Redis(opts)

Interfaces

The framework is built around four pluggable interfaces:

type Authenticator interface {
    Authenticate(ctx context.Context, r *http.Request) (*Identity, error)
}

type Codec interface {
    Marshal(v any) ([]byte, error)
    Unmarshal(data []byte, v any) error
}

type Broker interface {
    Publish(ctx context.Context, topic string, msg []byte) error
    Subscribe(ctx context.Context, topic string, handler BrokerHandler) (Subscription, error)
    Close() error
}

type Presence interface {
    Register(ctx context.Context, info *SessionInfo) error
    Unregister(ctx context.Context, sessionID string) error
    GetIdentitySessions(ctx context.Context, tenantID, identityID string) ([]SessionInfo, error)
    IsOnline(ctx context.Context, tenantID, identityID string) (bool, error)
}

Error Sentinels

ErrRealtimeClosed       // gateway is shut down
ErrUnknownRoute         // no matching route handler
ErrBackpressure         // send queue full
ErrSessionClosed        // session already closed
ErrAlreadyStarted       // Start() called twice
ErrInvalidOption        // invalid configuration
ErrUnauthorized         // authentication failed
ErrConnectionLimit      // max connections reached

Testing

go test ./...

License

MIT

Documentation

Overview

Package realtime — see types.go for the full package doc. 本文件实现内置认证器:AnonymousAuthenticator(匿名)和 StaticTokenAuthenticator(静态令牌)。

Package realtime — see types.go for the full package doc. 本文件提供进程内本地 Broker 的根包适配器。

Package realtime — see types.go for the full package doc. 本文件实现基于 encoding/json 的默认编解码器(jsonCodec)。

Package realtime — see types.go for the full package doc. 本文件定义运行时配置结构体 Config、背压策略和默认配置常量。

Package realtime — see types.go for the full package doc. 本文件实现消息投递:广播(Broadcast)和定向发送(SendToIdentity),包含 broker 消息处理。

Package realtime — see types.go for the full package doc. 本文件定义框架使用的哨兵错误(sentinel errors)。

Package realtime — see types.go for the full package doc. 本文件实现生命周期钩子(OnConnect/OnDisconnect/OnError)、消息路由注册和中间件管理。

Package realtime — see types.go for the full package doc. 本文件实现框架内部 ID 生成工具,使用加密随机字节生成唯一标识符。

Package realtime — see types.go for the full package doc. 本文件实现本地会话索引(localHub),按 session ID 和 identity ID 维护在线连接。

Package realtime — see types.go for the full package doc. 本文件实现标准库 log.Printf 的 LogSink 适配器(stdLogger)。

Package realtime — see types.go for the full package doc. 本文件实现消息(Message)的创建、JSON 解码和克隆方法。

Package realtime — see types.go for the full package doc. 本文件实现内置中间件(Recovery、Timeout、Logger)和中间件链构建工具。

Package realtime — see types.go for the full package doc. 本文件定义选项模式(Option pattern)和所有 With* 配置函数。

Package realtime — see types.go for the full package doc. 本文件提供基于内存的在线状态追踪器根包适配器。

Package realtime — see types.go for the full package doc. 本文件实现框架门面结构体 Realtime、HTTP Upgrade 处理和连接生命周期管理。

Package realtime — see types.go for the full package doc. 本文件实现内部路由器,支持精确匹配、最长前缀匹配和兜底处理三种路由策略。

Package realtime — see types.go for the full package doc. 本文件实现会话(Session)结构体,代表一个已连接的 WebSocket 客户端。

Package realtime provides a production-ready WebSocket gateway framework. Package realtime 提供生产级 WebSocket 实时网关框架,支持连接认证、消息路由、中间件链、 在线状态追踪和跨节点消息投递。核心结构 Realtime 实现了 http.Handler 接口,可直接嵌入任何 Go HTTP 服务。

Index

Constants

View Source
const (
	// DefaultHeartbeatInterval 是默认服务端 ping 间隔。
	DefaultHeartbeatInterval = 30 * time.Second
	// DefaultPongTimeout 是等待 ping/pong 完成的默认超时。
	DefaultPongTimeout = 10 * time.Second
	// DefaultWriteTimeout 是单次写消息的默认超时。
	DefaultWriteTimeout = 10 * time.Second
	// DefaultReadLimit 是单条入站消息的默认大小上限。
	DefaultReadLimit = 1 << 20
	// DefaultSendQueueSize 是每个 session 的默认发送队列长度。
	DefaultSendQueueSize = 256
	// DefaultShutdownTimeout 是 Shutdown 未传 deadline 时使用的默认超时。
	DefaultShutdownTimeout = 10 * time.Second
	// DefaultAuthTimeout 是 Upgrade 前认证流程的默认超时。
	DefaultAuthTimeout = 5 * time.Second
	// DefaultMaxConnections 是单个 Realtime 实例默认允许的最大连接数。
	DefaultMaxConnections = 10000
	// DefaultPresenceTTL 是在线状态租约的默认有效期。
	DefaultPresenceTTL = 90 * time.Second
)

Variables

View Source
var (
	// ErrRealtimeClosed 表示 Realtime 正在关闭或已经关闭,不能再接受新操作。
	ErrRealtimeClosed = errors.New("realtime: shutdown in progress")
	// ErrUnknownRoute 表示消息 route 没有匹配到任何 handler。
	ErrUnknownRoute = errors.New("realtime: unknown route")
	// ErrBackpressure 表示 session 发送队列已满并触发背压策略。
	ErrBackpressure = errors.New("realtime: send queue full")
	// ErrSessionClosed 表示目标 session 已关闭或不在当前节点。
	ErrSessionClosed = errors.New("realtime: session closed")
	// ErrAlreadyStarted 表示组件已经启动,不能重复执行互斥初始化。
	ErrAlreadyStarted = errors.New("realtime: already started")
	// ErrInvalidOption 表示配置或认证输入不合法。
	ErrInvalidOption = errors.New("realtime: invalid option")
	// ErrUnauthorized 表示认证凭证缺失或不匹配。
	ErrUnauthorized = errors.New("realtime: unauthorized")
	// ErrInvalidBrokerMessage 表示 broker 收到的消息信封格式不合法。
	ErrInvalidBrokerMessage = errors.New("realtime: invalid broker message")
	// ErrConnectionLimit 表示当前实例已达到连接数上限。
	ErrConnectionLimit = errors.New("realtime: connection limit reached")
)

Functions

This section is empty.

Types

type AnonymousAuthenticator

type AnonymousAuthenticator struct{}

AnonymousAuthenticator allows requests without authentication. AnonymousAuthenticator 是默认认证器,所有请求都会得到 anonymous 身份。

func (AnonymousAuthenticator) Authenticate

func (AnonymousAuthenticator) Authenticate(ctx context.Context, r *http.Request) (*Identity, error)

Authenticate returns the anonymous identity. Authenticate 不读取请求内容,直接返回匿名身份。

type Authenticator

type Authenticator interface {
	Authenticate(ctx context.Context, r *http.Request) (*Identity, error)
}

Authenticator authenticates an HTTP upgrade request. Authenticator 在 WebSocket Upgrade 前执行,用于把 HTTP 请求转换为身份快照。

type BackpressurePolicy

type BackpressurePolicy int

BackpressurePolicy controls behavior when a session send queue is full. BackpressurePolicy 定义慢消费者出现时的处理方式。

const (
	// CloseOnBackpressure 关闭慢消费者,是默认的安全策略。
	CloseOnBackpressure BackpressurePolicy = iota
	// DropNewest 丢弃本次发送,保留队列中已有消息。
	DropNewest
	// DropOldest 丢弃队列中最旧消息,再放入本次发送。
	DropOldest
)

type Broker

type Broker interface {
	Publish(ctx context.Context, topic string, msg []byte) error
	Subscribe(ctx context.Context, topic string, handler BrokerHandler) (Subscription, error)
	Close() error
}

Broker publishes messages to local or remote subscribers. Broker 抽象跨节点消息投递;本地实现用于单进程,RabbitMQ 等实现用于分布式部署。

type BrokerHandler

type BrokerHandler func(ctx context.Context, msg BrokerMessage) error

BrokerHandler handles broker messages. BrokerHandler 处理从 broker topic 收到的原始消息。

type BrokerMessage

type BrokerMessage struct {
	Topic      string
	Data       []byte
	SourceNode string
}

BrokerMessage is a message delivered by a Broker. BrokerMessage 是 broker 投递给框架的消息,Data 保持 broker payload 原样。

type Codec

type Codec interface {
	Marshal(v any) ([]byte, error)
	Unmarshal(data []byte, v any) error
}

Codec encodes and decodes framework envelopes. Codec 只负责 envelope 编解码,不理解业务 payload。

type Config

type Config struct {
	// HeartbeatInterval 控制服务端 ping 间隔。
	HeartbeatInterval time.Duration
	// DisableHeartbeat 显式禁用服务端 ping。
	DisableHeartbeat bool
	// PongTimeout 控制 ping/pong 检测的超时时间。
	PongTimeout time.Duration
	// DisablePongTimeout 显式禁用 pong 等待超时。
	DisablePongTimeout bool
	// WriteTimeout 控制单次 WebSocket 写操作的超时时间。
	WriteTimeout time.Duration
	// DisableWriteTimeout 显式禁用写超时。
	DisableWriteTimeout bool
	// ReadLimit 控制单条入站消息的最大字节数。
	ReadLimit int64
	// SendQueueSize 控制每个 session 的出站队列长度。
	SendQueueSize int
	// Backpressure 控制出站队列满时的处理方式。
	Backpressure BackpressurePolicy
	// ShutdownTimeout 控制 Shutdown 未传 deadline 时的默认等待时间。
	ShutdownTimeout time.Duration
	// DisableShutdownTimeout 显式禁用 Shutdown 默认等待超时。
	DisableShutdownTimeout bool
	// AuthTimeout 控制 Upgrade 前认证流程的超时时间。
	AuthTimeout time.Duration
	// DisableAuthTimeout 显式禁用认证超时。
	DisableAuthTimeout bool
	// MaxConnections 控制当前实例最大连接数,0 表示不限制。
	MaxConnections int
	// DisableMaxConnections 显式禁用默认连接数上限。
	DisableMaxConnections bool
	// PresenceTTL 控制 presence 在线快照的租约时间。
	PresenceTTL time.Duration
	// DisablePresenceTTL 显式禁用 presence 在线快照租约。
	DisablePresenceTTL bool
	// NodeID 标识当前节点,用于 broker 定向投递和 presence 快照。
	NodeID string
	// Debug 开关框架内部调试日志。
	Debug bool
	// OriginPatterns 定义允许的 Origin host 或 URL pattern。
	OriginPatterns []string
	// InsecureOrigins 关闭 Origin 校验,仅建议开发环境使用。
	InsecureOrigins bool
}

Config contains framework settings. Zero values are replaced with safe defaults. Config 只承载基础运行时配置;外部依赖通过 Option 注入,避免把配置结构体变成组件工厂。

type DisconnectHandler

type DisconnectHandler func(ctx context.Context, s *Session, info DisconnectInfo)

DisconnectHandler handles the final disconnect notification for a session. DisconnectHandler 在 session 已完成本地 hub 和 presence 清理后执行;业务侧看到的是最终下线状态。

type DisconnectInfo

type DisconnectInfo struct {
	// Err 是导致断开的底层错误;主动关闭时可能为 nil。
	Err error
	// Code 是 WebSocket close code;无法获得时为 0。
	Code int
	// Reason 是 WebSocket close reason 或框架记录的关闭原因。
	Reason string
	// Expected 表示该断开是否属于正常关闭、服务端关闭或可预期网络断开。
	Expected bool
	// Initiator 表示断开发起方或触发断开的框架子系统。
	Initiator DisconnectInitiator
	// At 是框架记录断开原因的时间,使用 UTC。
	At time.Time
}

DisconnectInfo describes why a session was closed. DisconnectInfo 是 OnDisconnect 接收到的结构化断开原因;Err 保留底层错误,Code/Reason 保留关闭帧语义。

type DisconnectInitiator

type DisconnectInitiator string

DisconnectInitiator identifies which side or subsystem initiated a disconnect. DisconnectInitiator 描述连接断开的发起来源,便于业务区分正常离线、服务端关闭和背压踢下线。

const (
	// DisconnectByClient 表示客户端主动关闭或网络正常断开。
	DisconnectByClient DisconnectInitiator = "client"
	// DisconnectByServer 表示业务代码调用 Session.Close 主动关闭。
	DisconnectByServer DisconnectInitiator = "server"
	// DisconnectByShutdown 表示 Realtime.Shutdown 触发的服务端下线。
	DisconnectByShutdown DisconnectInitiator = "shutdown"
	// DisconnectByBackpressure 表示发送队列满导致框架按背压策略关闭慢消费者。
	DisconnectByBackpressure DisconnectInitiator = "backpressure"
	// DisconnectByError 表示读写循环遇到非预期错误后关闭连接。
	DisconnectByError DisconnectInitiator = "error"
)

type HandlerFunc

type HandlerFunc func(ctx context.Context, s *Session, msg *Message) error

HandlerFunc handles one decoded realtime message for a connected session. HandlerFunc 是业务消息处理函数;框架只保证 msg 是 envelope,Data 字段含义由业务自行定义。

type Identity

type Identity struct {
	ID       string
	TenantID string
	Roles    []string
	Claims   map[string]any
}

Identity is an authenticated caller snapshot. Identity 是认证结果快照。框架不假设它等价于用户、会员或其他业务模型。

func (Identity) Clone

func (i Identity) Clone() Identity

Clone returns a copy for safe cross-boundary use. Roles and the Claims map are copied at the outer container level. Claim values are not deep-copied because they may contain arbitrary application objects; keep nested values immutable or clone them before attaching them to an Identity. Clone 会复制 Roles 和 Claims map 外层;Claims 内部 value 可能是任意业务对象, 框架不做通用深拷贝,嵌套值应由业务保持不可变或自行拷贝。

type LogSink

type LogSink interface {
	Printf(format string, args ...any)
}

LogSink is the minimal logging interface used by the framework. LogSink 是框架使用的最小日志接口,便于接入标准库 logger 或业务日志。

type Message

type Message struct {
	ID       string            `json:"id,omitempty"`
	Route    string            `json:"route,omitempty"`
	Data     json.RawMessage   `json:"data,omitempty"`
	Metadata map[string]string `json:"metadata,omitempty"`
}

Message is the framework envelope. Data belongs to the caller. Message 是框架唯一理解的消息信封;框架只使用 Route 做分发,Data 内部结构完全属于业务层。

func NewMessage

func NewMessage(route string, data any) (*Message, error)

NewMessage creates an envelope with JSON-encoded Data. NewMessage 根据 route 和业务 data 构造消息信封;RawMessage 和 []byte 会被按原始 JSON 复制。

func (*Message) Clone

func (m *Message) Clone() *Message

Clone returns a snapshot copy of the message. Clone 用于跨边界传递消息,避免 handler 或发送路径共享可变 RawMessage/Metadata。

func (*Message) Decode

func (m *Message) Decode(v any) error

Decode decodes the message Data payload as JSON. Decode 是业务 handler 中解析 Data 的便捷方法;空 Data 会被视为无操作。

type Middleware

type Middleware func(HandlerFunc) HandlerFunc

Middleware wraps a message handler. Middleware 只包裹消息处理链,不参与连接认证和底层读写循环。

func Logger

func Logger() Middleware

Logger logs handled messages and handler errors. Logger 在业务 handler 返回错误时记录 session、route 和错误内容。

func Recovery

func Recovery() Middleware

Recovery converts panics in handlers to errors. Recovery 将业务 handler panic 转换为 error,避免单条消息打断连接循环。

func Timeout

func Timeout(timeout time.Duration) Middleware

Timeout bounds handler execution time. Timeout 为业务 handler 增加上下文超时;timeout <= 0 时不做限制。

type Option

type Option func(*Options)

Option configures a Realtime instance. Option 用于注入配置或替换框架依赖;nil Option 会被忽略。

func WithAuthTimeout

func WithAuthTimeout(timeout time.Duration) Option

WithAuthTimeout sets the timeout for authentication. WithAuthTimeout 设置 Upgrade 前认证流程的超时时间。

func WithAuthenticator

func WithAuthenticator(a Authenticator) Option

WithAuthenticator sets the authenticator used before WebSocket upgrade. WithAuthenticator 设置 Upgrade 前使用的认证器;传入 nil 时保持原配置。

func WithBackpressure

func WithBackpressure(policy BackpressurePolicy) Option

WithBackpressure sets the full send-queue policy. WithBackpressure 设置发送队列满时的背压策略,默认关闭慢消费者。

func WithBroker

func WithBroker(b Broker) Option

WithBroker sets the broker used for distributed delivery. WithBroker 设置跨节点投递使用的 broker;单节点场景可使用默认本地 broker。

func WithCodec

func WithCodec(c Codec) Option

WithCodec sets the envelope codec. WithCodec 设置消息信封编解码器;传入 nil 时保持原配置。

func WithConfig

func WithConfig(cfg Config) Option

WithConfig applies a Config. Explicit dependency options can still override dependencies. WithConfig 应用基础运行配置;认证、编码器、broker、presence 等依赖仍通过独立 Option 注入。

func WithDebug

func WithDebug(enabled bool) Option

WithDebug enables or disables framework debug logs. WithDebug 开关框架调试日志,主要用于排查广播、定向投递和 broker 消息流。

func WithHeartbeat

func WithHeartbeat(interval time.Duration) Option

WithHeartbeat sets the ping interval for each session. WithHeartbeat 设置服务端 ping 间隔,用于检测不可用连接。

func WithInsecureOrigins

func WithInsecureOrigins() Option

WithInsecureOrigins disables origin verification. Prefer WithOriginPatterns in production. WithInsecureOrigins 关闭 Origin 校验,仅建议本地开发或受信任内网环境使用。

func WithLogger

func WithLogger(l LogSink) Option

WithLogger sets the framework logger. WithLogger 设置框架内部错误和 debug 日志输出;传入 nil 时保持原配置。

func WithMaxConnections

func WithMaxConnections(max int) Option

WithMaxConnections sets the connection cap. Use 0 to explicitly disable the cap. WithMaxConnections 设置当前 Realtime 实例的连接数上限;传 0 表示显式禁用上限。

func WithOriginPatterns

func WithOriginPatterns(patterns ...string) Option

WithOriginPatterns sets allowed Origin host or URL patterns. WithOriginPatterns 设置允许的 Origin 匹配规则;生产环境优先使用它而不是关闭校验。

func WithPresence

func WithPresence(p Presence) Option

WithPresence sets the online session index. WithPresence 设置在线状态存储;分布式部署通常应替换为 Redis 等共享实现。

func WithReadLimit

func WithReadLimit(limit int64) Option

WithReadLimit sets the maximum inbound WebSocket message size. WithReadLimit 设置单条入站 WebSocket 消息大小上限。

func WithSendQueueSize

func WithSendQueueSize(size int) Option

WithSendQueueSize sets the per-session outbound queue size. WithSendQueueSize 设置每个连接的发送队列长度,影响慢消费者的缓冲空间。

func WithWriteTimeout

func WithWriteTimeout(timeout time.Duration) Option

WithWriteTimeout sets the timeout for each outbound write. WithWriteTimeout 设置单次写消息的超时时间。

type Options

type Options struct {
	Config
	Authenticator Authenticator // 连接认证器 / Connection authenticator
	Codec         Codec         // 消息编解码器 / Message envelope codec
	Broker        Broker        // 跨节点消息代理 / Cross-node message broker
	Presence      Presence      // 在线状态追踪器 / Online session presence tracker
	Logger        LogSink       // 框架日志输出 / Framework diagnostic logger
}

Options contains dependencies and settings after all options are applied. Options 是 New 应用所有 Option 后的运行时依赖集合,通常无需业务直接构造。

type Presence

type Presence interface {
	Register(ctx context.Context, session *SessionInfo) error
	Unregister(ctx context.Context, sessionID string) error
	GetIdentitySessions(ctx context.Context, tenantID, identityID string) ([]SessionInfo, error)
	IsOnline(ctx context.Context, tenantID, identityID string) (bool, error)
}

Presence stores online session indexes. Presence 存储在线 session 快照;分布式实现应在内部处理 TTL、节点和 stale cleanup。

type Realtime

type Realtime struct {
	// contains filtered or unexported fields
}

Realtime is the framework facade and HTTP handler. Realtime 是框架入口,负责 HTTP Upgrade、连接生命周期、路由分发、presence 和 broker 协调。

func New

func New(options ...Option) *Realtime

New creates a realtime gateway with safe single-node defaults. New 使用安全的单节点默认值创建 Realtime;可通过 Option 替换认证、编码、broker 和 presence。

func (*Realtime) Broadcast

func (r *Realtime) Broadcast(ctx context.Context, route string, data any) error

Broadcast sends one envelope to every currently connected session on every started node. Use this only for true global fan-out. For messages targeting one identity/account, use SendToIdentity so only nodes that hold matching online sessions receive broker payloads. Broadcast 会先投递本节点在线 session,再通过 broker 投递到其他已 Start 的节点。

func (*Realtime) GetIdentitySessions

func (r *Realtime) GetIdentitySessions(ctx context.Context, tenantID, identityID string) ([]SessionInfo, error)

GetIdentitySessions returns online session snapshots for an identity. GetIdentitySessions 返回指定身份当前在线的 session 快照,结果来自 Presence。

func (*Realtime) Handle

func (r *Realtime) Handle(route string, h HandlerFunc)

Handle registers an exact route handler. Handle 精确匹配 Message.Route,优先级高于 prefix 和 fallback handler。

func (*Realtime) HandlePrefix

func (r *Realtime) HandlePrefix(prefix string, h HandlerFunc)

HandlePrefix registers a prefix route handler. HandlePrefix 适合按业务域分发 route;多个前缀命中时使用最长前缀。

func (*Realtime) IsOnline

func (r *Realtime) IsOnline(ctx context.Context, tenantID, identityID string) (bool, error)

IsOnline reports whether an identity currently has at least one online session. IsOnline 查询指定租户身份是否至少存在一个在线 session。

func (*Realtime) OnConnect

func (r *Realtime) OnConnect(h func(context.Context, *Session) error)

OnConnect registers a hook that runs after presence and local hub registration. OnConnect 返回错误会拒绝本次连接,并回滚已经写入的在线状态。

func (*Realtime) OnDisconnect

func (r *Realtime) OnDisconnect(h DisconnectHandler)

OnDisconnect registers a hook that runs after the session is fully cleaned up. OnDisconnect 接收结构化 DisconnectInfo;此时 session 已从 hub 和 presence 中移除。

func (*Realtime) OnError

func (r *Realtime) OnError(h func(context.Context, error))

OnError registers a framework error hook. OnError 接收认证、消息处理、broker 分发和非预期断开等错误;未注册时错误会写入 Logger。

func (*Realtime) OnMessage

func (r *Realtime) OnMessage(h HandlerFunc)

OnMessage registers the fallback handler used when no route-specific handler matches. OnMessage 适合作为简单应用的统一入口;复杂应用优先使用 Handle 或 HandlePrefix。

func (*Realtime) SendToIdentity

func (r *Realtime) SendToIdentity(ctx context.Context, tenantID, identityID, route string, data any) error

SendToIdentity sends one envelope to all currently online sessions for an identity. Presence is queried first, then remote broker payloads are published only to the node topics that currently hold matching sessions. SendToIdentity 适合账号/身份级定向消息;没有在线 session 时返回 ErrSessionClosed。

func (*Realtime) ServeHTTP

func (r *Realtime) ServeHTTP(w http.ResponseWriter, req *http.Request)

ServeHTTP upgrades HTTP requests to websocket sessions. ServeHTTP 实现 http.Handler,完成 Origin 校验、认证、Upgrade 和 session 运行。

func (*Realtime) Shutdown

func (r *Realtime) Shutdown(ctx context.Context) error

Shutdown rejects new upgrades, closes sessions, unregisters presence, and closes the broker. Shutdown 会拒绝新连接、关闭现有 session、等待清理完成,然后关闭 broker 和 presence。

func (*Realtime) Start

func (r *Realtime) Start(ctx context.Context) error

Start starts broker subscriptions used by distributed delivery. Start 启动分布式投递所需的 broker 订阅;单节点只使用本地发送时可以不调用。

func (*Realtime) Use

func (r *Realtime) Use(middleware ...Middleware)

Use appends message middlewares to the dispatch chain. Use 按注册顺序追加中间件,实际执行顺序与常见 HTTP middleware 一致:先注册的先进入。

type Session

type Session struct {
	// contains filtered or unexported fields
}

Session represents one connected realtime client. Session 是业务代码最常接触的连接对象。它只暴露只读快照和发送/关闭能力, 不暴露底层 websocket、队列和内部索引,避免调用方破坏连接生命周期。

func (*Session) Close

func (s *Session) Close(code int, reason string) error

Close gracefully closes the session from application code. Close 是幂等方法;第一次调用会记录 server 发起的关闭原因,后续调用只返回零值。

func (*Session) CloseInfo

func (s *Session) CloseInfo() DisconnectInfo

CloseInfo returns the recorded disconnect information. CloseInfo 在连接尚未关闭时返回零值;关闭后返回第一次触发关闭时记录的原因。

func (*Session) Closed

func (s *Session) Closed() bool

Closed reports whether the session has started closing. Closed 用于业务在异步发送前快速判断连接是否已关闭;真实发送仍以 Send 返回值为准。

func (*Session) ConnectedAt

func (s *Session) ConnectedAt() time.Time

ConnectedAt 返回 session 建立时间,使用 UTC。

func (*Session) Context

func (s *Session) Context() context.Context

Context 返回随 session 关闭而取消的上下文。

func (*Session) ID

func (s *Session) ID() string

ID 返回 session 的框架内唯一标识。

func (*Session) Identity

func (s *Session) Identity() Identity

Identity 返回连接认证身份的快照副本。

func (*Session) RemoteAddr

func (s *Session) RemoteAddr() string

RemoteAddr 返回 HTTP Upgrade 请求中的远端地址。

func (*Session) Send

func (s *Session) Send(ctx context.Context, route string, data any) error

Send encodes data into a message envelope and queues it for the write loop. Send 是业务向当前连接发送消息的便捷方法;当队列已满时会按 BackpressurePolicy 处理。

func (*Session) SendMessage

func (s *Session) SendMessage(ctx context.Context, msg *Message) error

SendMessage queues an already-built message envelope for this session. SendMessage 会复制消息后再编码入队,避免调用方后续修改影响发送内容。

type SessionInfo

type SessionInfo struct {
	SessionID   string    `json:"session_id"`
	IdentityID  string    `json:"identity_id"`
	TenantID    string    `json:"tenant_id"`
	NodeID      string    `json:"node_id"`
	RemoteAddr  string    `json:"remote_addr"`
	ConnectedAt time.Time `json:"connected_at"`
	ExpiresAt   time.Time `json:"expires_at"`
}

SessionInfo is a presence snapshot for an online session. SessionInfo 是 Presence 层使用的在线状态快照,不应反向暴露内部 Session 指针。

type StaticTokenAuthenticator

type StaticTokenAuthenticator struct {
	Token    string   // 用于验证的静态 Bearer token / Static Bearer token for validation
	Identity Identity // 认证成功后返回的身份快照 / Identity snapshot returned on success
}

StaticTokenAuthenticator authenticates a fixed bearer token. StaticTokenAuthenticator 使用固定 Bearer token 认证请求,适合示例或简单内部服务。

func (StaticTokenAuthenticator) Authenticate

func (a StaticTokenAuthenticator) Authenticate(ctx context.Context, r *http.Request) (*Identity, error)

Authenticate validates Authorization: Bearer <token> and returns an identity snapshot. Authenticate 使用常量时间比较 token,成功后返回配置身份的副本。

type Subscription

type Subscription interface {
	Close() error
}

Subscription represents a broker subscription. Subscription 表示一个 broker 订阅,Close 后不应再接收该 topic 消息。

Directories

Path Synopsis
Package auth provides authenticator constructors for the realtime framework.
Package auth provides authenticator constructors for the realtime framework.
Package broker provides broker implementations for cross-node message delivery.
Package broker provides broker implementations for cross-node message delivery.
Package codec defines helpers for message envelope encoding/decoding.
Package codec defines helpers for message envelope encoding/decoding.
internal
localbroker
Package localbroker contains the shared in-process broker implementation.
Package localbroker contains the shared in-process broker implementation.
memorypresence
Package memorypresence contains the shared in-memory presence implementation.
Package memorypresence contains the shared in-memory presence implementation.
Package presence provides online session index implementations.
Package presence provides online session index implementations.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL