Documentation
¶
Index ¶
- Constants
- Variables
- type AllLogsSubscription
- type AuthService
- type AuthServiceAdapter
- type ChangeEvent
- type ClientMessage
- type Connection
- func NewConnection(id string, conn *websocket.Conn, userID *string, role string, ...) *Connection
- func NewConnectionSync(id string, conn *websocket.Conn, userID *string, role string, ...) *Connection
- func NewConnectionWithQueueSize(id string, conn *websocket.Conn, userID *string, role string, ...) *Connection
- func (c *Connection) AllowMessage() bool
- func (c *Connection) Close() error
- func (c *Connection) Context() context.Context
- func (c *Connection) GetQueueStats() ConnectionQueueStats
- func (c *Connection) GetRoleAndClaims() (string, map[string]interface{})
- func (c *Connection) IsSlowClient() bool
- func (c *Connection) IsSubscribed(channel string) bool
- func (c *Connection) SendMessage(msg interface{}) error
- func (c *Connection) Subscribe(channel string) bool
- func (c *Connection) Unsubscribe(channel string)
- func (c *Connection) UpdateAuth(userID *string, role string, claims map[string]interface{})
- type ConnectionEvent
- type ConnectionEventType
- type ConnectionInfo
- type ConnectionQueueStats
- type Filter
- type GlobalBroadcast
- type Listener
- type ListenerPool
- type ListenerPoolConfig
- type ListenerPoolMetrics
- type LogSubscription
- type LogSubscriptionConfig
- type Manager
- func (m *Manager) AddConnection(id string, conn *websocket.Conn, userID *string, role string, ...) (*Connection, error)
- func (m *Manager) AddConnectionWithIP(id string, conn *websocket.Conn, userID *string, role string, ...) (*Connection, error)
- func (m *Manager) BroadcastConnectionEvent(event ConnectionEvent)
- func (m *Manager) BroadcastGlobal(channel string, tenantID string, message ServerMessage) error
- func (m *Manager) BroadcastToChannel(channel string, tenantID string, message ServerMessage) int
- func (m *Manager) GetConnectionCount() int
- func (m *Manager) GetConnectionsForStats() []ConnectionInfo
- func (m *Manager) GetDetailedStats() map[string]interface{}
- func (m *Manager) GetIPConnectionCount(ip string) int
- func (m *Manager) GetSlowClientsDisconnected() uint64
- func (m *Manager) GetUserConnectionCount(userID string) int
- func (m *Manager) RemoveConnection(id string)
- func (m *Manager) SetBaseConfig(cfg *config.Config)
- func (m *Manager) SetConnectionLimits(maxPerUser, maxPerIP int)
- func (m *Manager) SetMaxConnections(max int)
- func (m *Manager) SetMetrics(metrics *observability.Metrics)
- func (m *Manager) SetPubSub(ps pubsub.PubSub)
- func (m *Manager) Shutdown()
- type ManagerConfig
- type MessageType
- type PostgresChangesConfig
- type PresenceInfo
- type PresenceManager
- func (pm *PresenceManager) CleanupConnection(connID string) map[string]*PresenceInfo
- func (pm *PresenceManager) GetChannelPresenceCount(channel string) int
- func (pm *PresenceManager) GetPresenceState(channel string) map[string][]PresenceState
- func (pm *PresenceManager) Track(channel, key string, state PresenceState, userID *string, connID string, ...) (*PresenceInfo, bool)
- func (pm *PresenceManager) Untrack(channel, key, connID string) *PresenceInfo
- type PresenceState
- type RLSCacheConfig
- type RealtimeHandler
- type RealtimeListener
- type ServerMessage
- type Subscription
- type SubscriptionDB
- type SubscriptionManager
- func (sm *SubscriptionManager) CheckExecutionOwnership(ctx context.Context, executionID, userID, executionType string) (isOwner bool, exists bool, err error)
- func (sm *SubscriptionManager) Close()
- func (sm *SubscriptionManager) CreateAllLogsSubscription(subID, connID, category string, levels []string) (*AllLogsSubscription, error)
- func (sm *SubscriptionManager) CreateLogSubscription(subID, connID, executionID, executionType string) (*LogSubscription, error)
- func (sm *SubscriptionManager) CreateSubscription(subID string, connID string, userID string, role string, ...) (*Subscription, error)
- func (sm *SubscriptionManager) FilterEventForSubscribers(ctx context.Context, event *ChangeEvent) map[string]*ChangeEvent
- func (sm *SubscriptionManager) GetAllLogsSubscribers() map[string]*AllLogsSubscription
- func (sm *SubscriptionManager) GetLogSubscribers(executionID string) []string
- func (sm *SubscriptionManager) GetLogSubscriptionsByConnection(connID string) []*LogSubscription
- func (sm *SubscriptionManager) GetStats() map[string]interface{}
- func (sm *SubscriptionManager) GetSubscriptionsByConnection(connID string) []*Subscription
- func (sm *SubscriptionManager) RemoveAllLogsSubscription(subID string) error
- func (sm *SubscriptionManager) RemoveConnectionAllLogsSubscriptions(connID string)
- func (sm *SubscriptionManager) RemoveConnectionLogSubscriptions(connID string)
- func (sm *SubscriptionManager) RemoveConnectionSubscriptions(connID string)
- func (sm *SubscriptionManager) RemoveLogSubscription(subID string) error
- func (sm *SubscriptionManager) RemoveSubscription(subID string) error
- func (sm *SubscriptionManager) UpdateConnectionClaims(connID string, newClaims map[string]interface{})
- func (sm *SubscriptionManager) UpdateConnectionRole(connID string, newRole string)
- type TokenClaims
Constants ¶
const ( DefaultRLSCacheTTL = 30 * time.Second // 30 seconds default DefaultRLSCacheMaxSize = 100000 // 100K entries default )
Default RLS cache settings (used when no config provided)
const AdminConnectionsChannel = "realtime:admin:connections"
AdminConnectionsChannel is the channel name for broadcasting connection events to admins
const AllLogsChannel = "fluxbase:all_logs"
AllLogsChannel is the PubSub channel for all log notifications (admin streaming).
const DefaultMaxSubscriptions = 100
const DefaultMessageQueueSize = 256
DefaultMessageQueueSize is the default size of the per-connection message queue
const LogChannel = "fluxbase:logs"
LogChannel is the PubSub channel for execution log notifications.
const MaxSlowClientWarnings = 3
MaxSlowClientWarnings is the number of slow client warnings before marking unhealthy
const WriteTimeout = 10 * time.Second
WriteTimeout is the maximum time allowed to write a message to a WebSocket client
Variables ¶
var ErrConnectionClosed = errors.New("connection is closed")
ErrConnectionClosed is returned when trying to send to a closed connection
var ErrMaxConnectionsReached = errors.New("maximum number of websocket connections reached")
ErrMaxConnectionsReached is returned when the maximum number of connections is reached
var ErrMaxIPConnectionsReached = errors.New("maximum number of websocket connections per IP reached")
ErrMaxIPConnectionsReached is returned when an IP has exceeded its connection limit
var ErrMaxUserConnectionsReached = errors.New("maximum number of websocket connections per user reached")
ErrMaxUserConnectionsReached is returned when a user has exceeded their connection limit
var ErrQueueFull = errors.New("message queue is full")
ErrQueueFull is returned when the message queue is full
var ErrSlowClient = errors.New("client is too slow to receive messages")
ErrSlowClient is returned when a client is too slow to receive messages
Functions ¶
This section is empty.
Types ¶
type AllLogsSubscription ¶
type AllLogsSubscription struct {
ID string
ConnID string
Category string // Optional filter by category
Levels []string // Optional filter by levels
}
AllLogsSubscription represents a subscription to all logs (admin only)
type AuthService ¶
type AuthService interface {
ValidateToken(token string) (*TokenClaims, error)
}
AuthService interface for JWT validation (allows mocking in tests)
type AuthServiceAdapter ¶
type AuthServiceAdapter struct {
// contains filtered or unexported fields
}
AuthServiceAdapter adapts auth.Service to realtime.AuthService interface
func NewAuthServiceAdapter ¶
func NewAuthServiceAdapter(service *auth.Service) *AuthServiceAdapter
NewAuthServiceAdapter creates a new auth service adapter
func (*AuthServiceAdapter) ValidateToken ¶
func (a *AuthServiceAdapter) ValidateToken(token string) (*TokenClaims, error)
ValidateToken validates a JWT token and returns claims
type ChangeEvent ¶
type ChangeEvent struct {
Type string `json:"type"` // INSERT, UPDATE, DELETE
Table string `json:"table"` // Table name
Schema string `json:"schema"` // Schema name
Record map[string]interface{} `json:"record"` // New record data
OldRecord map[string]interface{} `json:"old_record,omitempty"` // Old record data (for UPDATE/DELETE)
}
ChangeEvent represents a database change event
func ParseChangeEvent ¶
func ParseChangeEvent(payload string) (*ChangeEvent, error)
ParseChangeEvent parses a JSON payload into a ChangeEvent
type ClientMessage ¶
type ClientMessage struct {
Type MessageType `json:"type"`
Channel string `json:"channel,omitempty"`
Event string `json:"event,omitempty"` // INSERT, UPDATE, DELETE, or *
Schema string `json:"schema,omitempty"`
Table string `json:"table,omitempty"`
Filter string `json:"filter,omitempty"` // Filter: column=operator.value
Payload json.RawMessage `json:"payload,omitempty"`
Config json.RawMessage `json:"config,omitempty"` // Raw config - can be PostgresChangesConfig or LogSubscriptionConfig
SubscriptionID string `json:"subscription_id,omitempty"`
MessageID string `json:"messageId,omitempty"` // Optional message ID for broadcast acknowledgements
Token string `json:"token,omitempty"` // JWT token for access_token message type
}
ClientMessage represents a message from the client
type Connection ¶
type Connection struct {
ID string
Conn *websocket.Conn
Subscriptions map[string]bool
UserID *string
Role string
Claims map[string]interface{}
TenantID string
ConnectedAt time.Time
// contains filtered or unexported fields
}
func NewConnection ¶
func NewConnection(id string, conn *websocket.Conn, userID *string, role string, claims map[string]interface{}, tenantID string, parentCtx context.Context) *Connection
NewConnection creates a new WebSocket connection with async message queue. If parentCtx is nil, context.Background() is used.
func NewConnectionSync ¶
func NewConnectionSync(id string, conn *websocket.Conn, userID *string, role string, claims map[string]interface{}) *Connection
NewConnectionSync creates a new connection with synchronous sending (for tests)
func NewConnectionWithQueueSize ¶
func NewConnectionWithQueueSize(id string, conn *websocket.Conn, userID *string, role string, claims map[string]interface{}, tenantID string, queueSize int, parentCtx context.Context) *Connection
NewConnectionWithQueueSize creates a new WebSocket connection with custom queue size. If parentCtx is nil, context.Background() is used.
func (*Connection) AllowMessage ¶
func (c *Connection) AllowMessage() bool
func (*Connection) Close ¶
func (c *Connection) Close() error
Close closes the WebSocket connection and stops the writer goroutine
func (*Connection) Context ¶
func (c *Connection) Context() context.Context
Context returns the connection's context for cancellation signaling. This allows external goroutines to detect when the connection is being shut down.
func (*Connection) GetQueueStats ¶
func (c *Connection) GetQueueStats() ConnectionQueueStats
GetQueueStats returns statistics about the message queue
func (*Connection) GetRoleAndClaims ¶
func (c *Connection) GetRoleAndClaims() (string, map[string]interface{})
func (*Connection) IsSlowClient ¶
func (c *Connection) IsSlowClient() bool
IsSlowClient returns true if this connection has been marked as a slow client
func (*Connection) IsSubscribed ¶
func (c *Connection) IsSubscribed(channel string) bool
IsSubscribed checks if the connection is subscribed to a channel
func (*Connection) SendMessage ¶
func (c *Connection) SendMessage(msg interface{}) error
SendMessage queues a message to be sent to the WebSocket client asynchronously. Returns ErrQueueFull if the message queue is full (client is too slow). Returns ErrSlowClient if the client has been consistently too slow. Returns ErrConnectionClosed if the connection has been closed.
func (*Connection) Subscribe ¶
func (c *Connection) Subscribe(channel string) bool
Subscribe adds a channel subscription for this connection
func (*Connection) Unsubscribe ¶
func (c *Connection) Unsubscribe(channel string)
Unsubscribe removes a channel subscription for this connection
func (*Connection) UpdateAuth ¶
func (c *Connection) UpdateAuth(userID *string, role string, claims map[string]interface{})
UpdateAuth updates the connection's authentication context
type ConnectionEvent ¶
type ConnectionEvent struct {
Type ConnectionEventType `json:"type"`
ID string `json:"id"`
UserID *string `json:"user_id"`
Email *string `json:"email"`
DisplayName *string `json:"display_name,omitempty"`
RemoteAddr string `json:"remote_addr"`
ConnectedAt string `json:"connected_at"` // ISO 8601 format
Timestamp string `json:"timestamp"` // Event timestamp
}
ConnectionEvent represents a connection lifecycle event This is broadcast to the admin channel when connections are established or terminated
func NewConnectionEvent ¶
func NewConnectionEvent(eventType ConnectionEventType, conn *Connection, email *string, displayName *string) ConnectionEvent
NewConnectionEvent creates a new connection event
func (ConnectionEvent) ToServerMessage ¶
func (e ConnectionEvent) ToServerMessage() ServerMessage
ToServerMessage converts a ConnectionEvent to a ServerMessage for broadcasting
type ConnectionEventType ¶
type ConnectionEventType string
ConnectionEventType represents the type of connection event
const ( ConnectionEventConnected ConnectionEventType = "connected" ConnectionEventDisconnected ConnectionEventType = "disconnected" )
type ConnectionInfo ¶
type ConnectionInfo struct {
ID string `json:"id"`
UserID *string `json:"user_id"`
Email *string `json:"email"`
DisplayName *string `json:"display_name,omitempty"`
RemoteAddr string `json:"remote_addr"`
ConnectedAt string `json:"connected_at"`
}
ConnectionInfo represents detailed information about a connection
type ConnectionQueueStats ¶
type ConnectionQueueStats struct {
QueueLength int
QueueCapacity int
QueueHighWater int32
MessagesSent uint64
MessagesDropped uint64
SlowClientCount int32
}
ConnectionQueueStats contains statistics about a connection's message queue
type Filter ¶
Filter represents a realtime subscription filter Format: column=operator.value Examples:
- created_by=eq.user123
- priority=gt.5
- status=in.(queued,running)
- data->key=eq.value (JSONB path access)
- data->>key=eq.value (JSONB text access)
func ParseFilter ¶
ParseFilter parses a filter string Returns nil if filterStr is empty (no filter)
type GlobalBroadcast ¶
type GlobalBroadcast struct {
Channel string `json:"channel"`
TenantID string `json:"tenant_id,omitempty"`
Message ServerMessage `json:"message"`
}
GlobalBroadcast represents a message broadcast across instances
type Listener ¶
type Listener struct {
// contains filtered or unexported fields
}
Listener handles PostgreSQL LISTEN/NOTIFY and PubSub log events.
NOTE: This file has substantial duplication with listener_pool.go (PubSub log listeners, processLogEvent, processAllLogsEvent, enrichJobWithETA). Both implement the RealtimeListener interface. The enrichJobWithETA logic has been extracted to listener_helpers.go. A full unification of the remaining duplicated code is deferred to avoid risk.
func NewListener ¶
func NewListener(pool *pgxpool.Pool, handler *RealtimeHandler, subManager *SubscriptionManager, ps pubsub.PubSub) *Listener
NewListener creates a new PostgreSQL listener
type ListenerPool ¶
type ListenerPool struct {
// contains filtered or unexported fields
}
ListenerPool manages a pool of PostgreSQL LISTEN connections with parallel processing.
NOTE: This file has substantial duplication with listener.go (PubSub log listeners, processLogEvent, processAllLogsEvent, enrichJobWithETA). Both implement the RealtimeListener interface. The enrichJobWithETA logic has been extracted to listener_helpers.go. A full unification of the remaining duplicated code is deferred to avoid risk.
func NewListenerPool ¶
func NewListenerPool( pool *pgxpool.Pool, handler *RealtimeHandler, subManager *SubscriptionManager, ps pubsub.PubSub, config ListenerPoolConfig, ) *ListenerPool
NewListenerPool creates a new listener pool with the given configuration.
func (*ListenerPool) GetMetrics ¶
func (lp *ListenerPool) GetMetrics() ListenerPoolMetrics
GetMetrics returns current metrics.
func (*ListenerPool) Stop ¶
func (lp *ListenerPool) Stop()
Stop gracefully shuts down the listener pool.
type ListenerPoolConfig ¶
type ListenerPoolConfig struct {
PoolSize int // Number of LISTEN connections (default: 2)
WorkerCount int // Number of workers for processing (default: 4)
QueueSize int // Size of notification queue per worker (default: 1000)
RetryInterval time.Duration
MaxRetries int
}
ListenerPoolConfig holds configuration for the listener pool.
func DefaultListenerPoolConfig ¶
func DefaultListenerPoolConfig() ListenerPoolConfig
DefaultListenerPoolConfig returns sensible defaults.
type ListenerPoolMetrics ¶
type ListenerPoolMetrics struct {
ActiveConnections int32
NotificationsReceived uint64
NotificationsProcessed uint64
ConnectionFailures uint64
Reconnections uint64
QueueLength int
QueueCapacity int
}
ListenerPoolMetrics contains metrics for the listener pool.
type LogSubscription ¶
type LogSubscription struct {
ID string
ConnID string
ExecutionID string
ExecutionType string // "function", "job", "rpc"
}
LogSubscription represents a subscription to execution logs
type LogSubscriptionConfig ¶
type LogSubscriptionConfig struct {
ExecutionID string `json:"execution_id"`
Type string `json:"type"` // function, job, rpc
}
LogSubscriptionConfig represents the config for subscribe_logs messages
type Manager ¶
type Manager struct {
// contains filtered or unexported fields
}
Manager manages all WebSocket connections
func NewManager ¶
NewManager creates a new connection manager
func NewManagerWithConfig ¶
func NewManagerWithConfig(ctx context.Context, config ManagerConfig) *Manager
NewManagerWithConfig creates a new connection manager with full configuration
func (*Manager) AddConnection ¶
func (m *Manager) AddConnection(id string, conn *websocket.Conn, userID *string, role string, claims map[string]interface{}, tenantID string) (*Connection, error)
AddConnection adds a new WebSocket connection Returns nil and ErrMaxConnectionsReached if the connection limit is exceeded Returns nil and ErrMaxUserConnectionsReached if the per-user limit is exceeded Returns nil and ErrMaxIPConnectionsReached if the per-IP limit is exceeded (for anonymous)
func (*Manager) AddConnectionWithIP ¶
func (m *Manager) AddConnectionWithIP(id string, conn *websocket.Conn, userID *string, role string, claims map[string]interface{}, tenantID string, remoteIP string) (*Connection, error)
AddConnectionWithIP adds a new WebSocket connection with explicit IP address This is useful when the IP is already known (e.g., from X-Forwarded-For header)
func (*Manager) BroadcastConnectionEvent ¶
func (m *Manager) BroadcastConnectionEvent(event ConnectionEvent)
BroadcastConnectionEvent broadcasts a connection event to the admin channel This allows admins to monitor connection lifecycle in real-time
func (*Manager) BroadcastGlobal ¶
func (m *Manager) BroadcastGlobal(channel string, tenantID string, message ServerMessage) error
func (*Manager) BroadcastToChannel ¶
func (m *Manager) BroadcastToChannel(channel string, tenantID string, message ServerMessage) int
BroadcastToChannel sends a message to all connections subscribed to a channel
func (*Manager) GetConnectionCount ¶
GetConnectionCount returns the total number of active connections
func (*Manager) GetConnectionsForStats ¶
func (m *Manager) GetConnectionsForStats() []ConnectionInfo
GetConnectionsForStats returns all connections as ConnectionInfo slice This is used by the stats handler which will enrich with emails
func (*Manager) GetDetailedStats ¶
GetDetailedStats returns detailed realtime statistics
func (*Manager) GetIPConnectionCount ¶
GetIPConnectionCount returns the number of connections for a specific IP
func (*Manager) GetSlowClientsDisconnected ¶
GetSlowClientsDisconnected returns the count of slow clients disconnected
func (*Manager) GetUserConnectionCount ¶
GetUserConnectionCount returns the number of connections for a specific user
func (*Manager) RemoveConnection ¶
RemoveConnection removes a WebSocket connection
func (*Manager) SetBaseConfig ¶
SetBaseConfig sets the base config for tenant-specific limit resolution
func (*Manager) SetConnectionLimits ¶
SetConnectionLimits updates the per-user and per-IP connection limits
func (*Manager) SetMaxConnections ¶
SetMaxConnections sets the maximum number of allowed connections
func (*Manager) SetMetrics ¶
func (m *Manager) SetMetrics(metrics *observability.Metrics)
SetMetrics sets the metrics instance for recording realtime metrics
type ManagerConfig ¶
type ManagerConfig struct {
MaxConnections int // Maximum total connections (0 = unlimited)
MaxConnectionsPerUser int // Maximum connections per user (0 = unlimited)
MaxConnectionsPerIP int // Maximum connections per IP for anonymous (0 = unlimited)
ClientMessageQueueSize int // Size of per-client message queue for async sending (0 = default)
SlowClientThreshold int // Queue length threshold for slow client detection (default: 100)
SlowClientTimeout time.Duration // Duration before disconnecting slow clients (default: 30s)
}
ManagerConfig holds configuration for the connection manager
type MessageType ¶
type MessageType string
MessageType represents the type of WebSocket message
const ( MessageTypeSubscribe MessageType = "subscribe" MessageTypeUnsubscribe MessageType = "unsubscribe" MessageTypeHeartbeat MessageType = "heartbeat" MessageTypeBroadcast MessageType = "broadcast" MessageTypePresence MessageType = "presence" MessageTypeError MessageType = "error" MessageTypeAck MessageType = "ack" MessageTypeChange MessageType = "postgres_changes" MessageTypeAccessToken MessageType = "access_token" MessageTypeSubscribeLogs MessageType = "subscribe_logs" // Subscribe to execution logs MessageTypeExecutionLog MessageType = "execution_log" // Execution log event from server MessageTypeSubscribeAllLogs MessageType = "subscribe_all_logs" // Subscribe to all logs (admin only) MessageTypeLogEntry MessageType = "log_entry" // Log entry event from server (all categories) )
type PostgresChangesConfig ¶
type PostgresChangesConfig struct {
Event string `json:"event"` // INSERT, UPDATE, DELETE, or *
Schema string `json:"schema"` // Database schema
Table string `json:"table"` // Table name
Filter string `json:"filter,omitempty"` // Optional filter: column=operator.value
}
PostgresChangesConfig represents the config object in postgres_changes subscriptions
type PresenceInfo ¶
type PresenceInfo struct {
Key string
State PresenceState
UserID *string
ConnID string
TenantID string
JoinedAt time.Time
}
PresenceInfo tracks presence information for a user/key
type PresenceManager ¶
type PresenceManager struct {
// contains filtered or unexported fields
}
PresenceManager manages presence tracking across channels
func NewPresenceManager ¶
func NewPresenceManager() *PresenceManager
NewPresenceManager creates a new presence manager
func (*PresenceManager) CleanupConnection ¶
func (pm *PresenceManager) CleanupConnection(connID string) map[string]*PresenceInfo
CleanupConnection removes all presence entries for a disconnected connection Returns a map of channel -> presence info that was removed
func (*PresenceManager) GetChannelPresenceCount ¶
func (pm *PresenceManager) GetChannelPresenceCount(channel string) int
GetChannelPresenceCount returns the number of active presences in a channel
func (*PresenceManager) GetPresenceState ¶
func (pm *PresenceManager) GetPresenceState(channel string) map[string][]PresenceState
GetPresenceState returns the current presence state for a channel Returns in the format expected by the SDK: map[key][]PresenceState
func (*PresenceManager) Track ¶
func (pm *PresenceManager) Track(channel, key string, state PresenceState, userID *string, connID string, tenantID string) (*PresenceInfo, bool)
Track adds or updates presence for a given channel and key
func (*PresenceManager) Untrack ¶
func (pm *PresenceManager) Untrack(channel, key, connID string) *PresenceInfo
Untrack removes presence for a given channel and key
type PresenceState ¶
type PresenceState map[string]interface{}
PresenceState represents the state data for a presence
type RLSCacheConfig ¶
type RLSCacheConfig struct {
MaxSize int // Maximum number of entries (0 = use default)
TTL time.Duration // Cache entry TTL (0 = use default)
}
RLSCacheConfig holds configuration for the RLS cache
type RealtimeHandler ¶
type RealtimeHandler struct {
// contains filtered or unexported fields
}
RealtimeHandler handles WebSocket connections
func NewRealtimeHandler ¶
func NewRealtimeHandler(manager *Manager, authService AuthService, subManager *SubscriptionManager) *RealtimeHandler
NewRealtimeHandler creates a new realtime handler
func (*RealtimeHandler) GetDetailedStats ¶
func (h *RealtimeHandler) GetDetailedStats() map[string]interface{}
GetDetailedStats returns detailed realtime statistics
func (*RealtimeHandler) GetManager ¶
func (h *RealtimeHandler) GetManager() *Manager
GetManager returns the realtime manager
func (*RealtimeHandler) GetStats ¶
func (h *RealtimeHandler) GetStats() map[string]interface{}
GetStats returns realtime statistics
func (*RealtimeHandler) HandleWebSocket ¶
func (h *RealtimeHandler) HandleWebSocket(c fiber.Ctx) error
HandleWebSocket handles WebSocket upgrade and communication
type RealtimeListener ¶
type RealtimeListener interface {
Start() error
Stop()
}
RealtimeListener is the interface for PostgreSQL LISTEN/NOTIFY handlers. Both the simple Listener and the pooled ListenerPool implement this interface.
type ServerMessage ¶
type ServerMessage struct {
Type MessageType `json:"type"`
Channel string `json:"channel,omitempty"`
Payload interface{} `json:"payload,omitempty"`
Error string `json:"error,omitempty"`
}
ServerMessage represents a message to the client
type Subscription ¶
type Subscription struct {
ID string
UserID string
Role string
Claims map[string]interface{} // Full JWT claims for RLS (includes custom claims like meeting_id, player_id)
Table string
Schema string
Event string // INSERT, UPDATE, DELETE, or * for all
Filter *Filter // Filter (column=operator.value)
ConnID string // Connection ID this subscription belongs to
}
Subscription represents an RLS-aware subscription to table changes
type SubscriptionDB ¶
type SubscriptionDB interface {
// IsTableRealtimeEnabled checks if a table is enabled for realtime in the schema registry.
IsTableRealtimeEnabled(ctx context.Context, schema, table string) (bool, error)
// CheckRLSAccess verifies if a user can access a record based on RLS policies.
// The claims map contains the full JWT claims to be passed to PostgreSQL for RLS evaluation.
CheckRLSAccess(ctx context.Context, schema, table, role string, claims map[string]interface{}, recordID interface{}) (bool, error)
// CheckRPCOwnership checks if a user owns an RPC execution.
CheckRPCOwnership(ctx context.Context, execID, userID uuid.UUID) (isOwner bool, exists bool, err error)
// CheckJobOwnership checks if a user owns a job execution.
CheckJobOwnership(ctx context.Context, execID, userID uuid.UUID) (isOwner bool, exists bool, err error)
// CheckFunctionOwnership checks if a user owns a function execution.
CheckFunctionOwnership(ctx context.Context, execID, userID uuid.UUID) (isOwner bool, exists bool, err error)
}
SubscriptionDB defines the database operations needed by SubscriptionManager. This interface allows for easier testing with mocks.
func NewPgxSubscriptionDB ¶
func NewPgxSubscriptionDB(pool *pgxpool.Pool) SubscriptionDB
NewPgxSubscriptionDB creates a SubscriptionDB backed by a pgx pool.
type SubscriptionManager ¶
type SubscriptionManager struct {
// contains filtered or unexported fields
}
SubscriptionManager manages RLS-aware subscriptions
func NewSubscriptionManager ¶
func NewSubscriptionManager(db SubscriptionDB) *SubscriptionManager
NewSubscriptionManager creates a new subscription manager with default RLS cache settings. For production use, pass NewPgxSubscriptionDB(pool). For testing, pass a mock implementation of SubscriptionDB.
func NewSubscriptionManagerWithConfig ¶
func NewSubscriptionManagerWithConfig(db SubscriptionDB, cacheConfig RLSCacheConfig) *SubscriptionManager
NewSubscriptionManagerWithConfig creates a new subscription manager with custom RLS cache settings.
func (*SubscriptionManager) CheckExecutionOwnership ¶
func (sm *SubscriptionManager) CheckExecutionOwnership(ctx context.Context, executionID, userID, executionType string) (isOwner bool, exists bool, err error)
CheckExecutionOwnership verifies if a user owns the execution. Returns (isOwner, exists, error). executionType can be "rpc", "job", "function", or empty (will try all).
func (*SubscriptionManager) Close ¶
func (sm *SubscriptionManager) Close()
func (*SubscriptionManager) CreateAllLogsSubscription ¶
func (sm *SubscriptionManager) CreateAllLogsSubscription(subID, connID, category string, levels []string) (*AllLogsSubscription, error)
CreateAllLogsSubscription creates a subscription for all logs (admin only)
func (*SubscriptionManager) CreateLogSubscription ¶
func (sm *SubscriptionManager) CreateLogSubscription(subID, connID, executionID, executionType string) (*LogSubscription, error)
CreateLogSubscription creates a subscription for execution logs
func (*SubscriptionManager) CreateSubscription ¶
func (sm *SubscriptionManager) CreateSubscription( subID string, connID string, userID string, role string, claims map[string]interface{}, schema string, table string, event string, filterStr string, ) (*Subscription, error)
CreateSubscription creates a new RLS-aware subscription
func (*SubscriptionManager) FilterEventForSubscribers ¶
func (sm *SubscriptionManager) FilterEventForSubscribers(ctx context.Context, event *ChangeEvent) map[string]*ChangeEvent
FilterEventForSubscribers filters a change event for all subscribers with RLS
func (*SubscriptionManager) GetAllLogsSubscribers ¶
func (sm *SubscriptionManager) GetAllLogsSubscribers() map[string]*AllLogsSubscription
GetAllLogsSubscribers returns all connection IDs subscribed to all logs, along with their filter preferences
func (*SubscriptionManager) GetLogSubscribers ¶
func (sm *SubscriptionManager) GetLogSubscribers(executionID string) []string
GetLogSubscribers returns all connection IDs subscribed to an execution's logs
func (*SubscriptionManager) GetLogSubscriptionsByConnection ¶
func (sm *SubscriptionManager) GetLogSubscriptionsByConnection(connID string) []*LogSubscription
GetLogSubscriptionsByConnection returns all log subscriptions for a connection
func (*SubscriptionManager) GetStats ¶
func (sm *SubscriptionManager) GetStats() map[string]interface{}
GetStats returns subscription statistics
func (*SubscriptionManager) GetSubscriptionsByConnection ¶
func (sm *SubscriptionManager) GetSubscriptionsByConnection(connID string) []*Subscription
GetSubscriptionsByConnection returns all subscriptions for a connection
func (*SubscriptionManager) RemoveAllLogsSubscription ¶
func (sm *SubscriptionManager) RemoveAllLogsSubscription(subID string) error
RemoveAllLogsSubscription removes an all-logs subscription
func (*SubscriptionManager) RemoveConnectionAllLogsSubscriptions ¶
func (sm *SubscriptionManager) RemoveConnectionAllLogsSubscriptions(connID string)
RemoveConnectionAllLogsSubscriptions removes all all-logs subscriptions for a connection
func (*SubscriptionManager) RemoveConnectionLogSubscriptions ¶
func (sm *SubscriptionManager) RemoveConnectionLogSubscriptions(connID string)
RemoveConnectionLogSubscriptions removes all log subscriptions for a connection
func (*SubscriptionManager) RemoveConnectionSubscriptions ¶
func (sm *SubscriptionManager) RemoveConnectionSubscriptions(connID string)
RemoveConnectionSubscriptions removes all subscriptions for a connection
func (*SubscriptionManager) RemoveLogSubscription ¶
func (sm *SubscriptionManager) RemoveLogSubscription(subID string) error
RemoveLogSubscription removes an execution log subscription
func (*SubscriptionManager) RemoveSubscription ¶
func (sm *SubscriptionManager) RemoveSubscription(subID string) error
RemoveSubscription removes a subscription
func (*SubscriptionManager) UpdateConnectionClaims ¶
func (sm *SubscriptionManager) UpdateConnectionClaims(connID string, newClaims map[string]interface{})
UpdateConnectionClaims updates the claims for all subscriptions belonging to a connection
func (*SubscriptionManager) UpdateConnectionRole ¶
func (sm *SubscriptionManager) UpdateConnectionRole(connID string, newRole string)
UpdateConnectionRole updates the role for all subscriptions belonging to a connection