realtime

package
v0.0.0-...-04478d6 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: May 10, 2026 License: AGPL-3.0 Imports: 27 Imported by: 0

Documentation

Index

Constants

View Source
const (
	DefaultRLSCacheTTL     = 30 * time.Second // 30 seconds default
	DefaultRLSCacheMaxSize = 100000           // 100K entries default
)

Default RLS cache settings (used when no config provided)

View Source
const AdminConnectionsChannel = "realtime:admin:connections"

AdminConnectionsChannel is the channel name for broadcasting connection events to admins

View Source
const AllLogsChannel = "fluxbase:all_logs"

AllLogsChannel is the PubSub channel for all log notifications (admin streaming).

View Source
const DefaultMaxSubscriptions = 100
View Source
const DefaultMessageQueueSize = 256

DefaultMessageQueueSize is the default size of the per-connection message queue

View Source
const LogChannel = "fluxbase:logs"

LogChannel is the PubSub channel for execution log notifications.

View Source
const MaxSlowClientWarnings = 3

MaxSlowClientWarnings is the number of slow client warnings before marking unhealthy

View Source
const WriteTimeout = 10 * time.Second

WriteTimeout is the maximum time allowed to write a message to a WebSocket client

Variables

View Source
var ErrConnectionClosed = errors.New("connection is closed")

ErrConnectionClosed is returned when trying to send to a closed connection

View Source
var ErrMaxConnectionsReached = errors.New("maximum number of websocket connections reached")

ErrMaxConnectionsReached is returned when the maximum number of connections is reached

View Source
var ErrMaxIPConnectionsReached = errors.New("maximum number of websocket connections per IP reached")

ErrMaxIPConnectionsReached is returned when an IP has exceeded its connection limit

View Source
var ErrMaxUserConnectionsReached = errors.New("maximum number of websocket connections per user reached")

ErrMaxUserConnectionsReached is returned when a user has exceeded their connection limit

View Source
var ErrQueueFull = errors.New("message queue is full")

ErrQueueFull is returned when the message queue is full

View Source
var ErrSlowClient = errors.New("client is too slow to receive messages")

ErrSlowClient is returned when a client is too slow to receive messages

Functions

This section is empty.

Types

type AllLogsSubscription

type AllLogsSubscription struct {
	ID       string
	ConnID   string
	Category string   // Optional filter by category
	Levels   []string // Optional filter by levels
}

AllLogsSubscription represents a subscription to all logs (admin only)

type AuthService

type AuthService interface {
	ValidateToken(token string) (*TokenClaims, error)
}

AuthService interface for JWT validation (allows mocking in tests)

type AuthServiceAdapter

type AuthServiceAdapter struct {
	// contains filtered or unexported fields
}

AuthServiceAdapter adapts auth.Service to realtime.AuthService interface

func NewAuthServiceAdapter

func NewAuthServiceAdapter(service *auth.Service) *AuthServiceAdapter

NewAuthServiceAdapter creates a new auth service adapter

func (*AuthServiceAdapter) ValidateToken

func (a *AuthServiceAdapter) ValidateToken(token string) (*TokenClaims, error)

ValidateToken validates a JWT token and returns claims

type ChangeEvent

type ChangeEvent struct {
	Type      string                 `json:"type"`                 // INSERT, UPDATE, DELETE
	Table     string                 `json:"table"`                // Table name
	Schema    string                 `json:"schema"`               // Schema name
	Record    map[string]interface{} `json:"record"`               // New record data
	OldRecord map[string]interface{} `json:"old_record,omitempty"` // Old record data (for UPDATE/DELETE)
}

ChangeEvent represents a database change event

func ParseChangeEvent

func ParseChangeEvent(payload string) (*ChangeEvent, error)

ParseChangeEvent parses a JSON payload into a ChangeEvent

type ClientMessage

type ClientMessage struct {
	Type           MessageType     `json:"type"`
	Channel        string          `json:"channel,omitempty"`
	Event          string          `json:"event,omitempty"` // INSERT, UPDATE, DELETE, or *
	Schema         string          `json:"schema,omitempty"`
	Table          string          `json:"table,omitempty"`
	Filter         string          `json:"filter,omitempty"` // Filter: column=operator.value
	Payload        json.RawMessage `json:"payload,omitempty"`
	Config         json.RawMessage `json:"config,omitempty"` // Raw config - can be PostgresChangesConfig or LogSubscriptionConfig
	SubscriptionID string          `json:"subscription_id,omitempty"`
	MessageID      string          `json:"messageId,omitempty"` // Optional message ID for broadcast acknowledgements
	Token          string          `json:"token,omitempty"`     // JWT token for access_token message type
}

ClientMessage represents a message from the client

type Connection

type Connection struct {
	ID            string
	Conn          *websocket.Conn
	Subscriptions map[string]bool
	UserID        *string
	Role          string
	Claims        map[string]interface{}
	TenantID      string
	ConnectedAt   time.Time
	// contains filtered or unexported fields
}

func NewConnection

func NewConnection(id string, conn *websocket.Conn, userID *string, role string, claims map[string]interface{}, tenantID string, parentCtx context.Context) *Connection

NewConnection creates a new WebSocket connection with async message queue. If parentCtx is nil, context.Background() is used.

func NewConnectionSync

func NewConnectionSync(id string, conn *websocket.Conn, userID *string, role string, claims map[string]interface{}) *Connection

NewConnectionSync creates a new connection with synchronous sending (for tests)

func NewConnectionWithQueueSize

func NewConnectionWithQueueSize(id string, conn *websocket.Conn, userID *string, role string, claims map[string]interface{}, tenantID string, queueSize int, parentCtx context.Context) *Connection

NewConnectionWithQueueSize creates a new WebSocket connection with custom queue size. If parentCtx is nil, context.Background() is used.

func (*Connection) AllowMessage

func (c *Connection) AllowMessage() bool

func (*Connection) Close

func (c *Connection) Close() error

Close closes the WebSocket connection and stops the writer goroutine

func (*Connection) Context

func (c *Connection) Context() context.Context

Context returns the connection's context for cancellation signaling. This allows external goroutines to detect when the connection is being shut down.

func (*Connection) GetQueueStats

func (c *Connection) GetQueueStats() ConnectionQueueStats

GetQueueStats returns statistics about the message queue

func (*Connection) GetRoleAndClaims

func (c *Connection) GetRoleAndClaims() (string, map[string]interface{})

func (*Connection) IsSlowClient

func (c *Connection) IsSlowClient() bool

IsSlowClient returns true if this connection has been marked as a slow client

func (*Connection) IsSubscribed

func (c *Connection) IsSubscribed(channel string) bool

IsSubscribed checks if the connection is subscribed to a channel

func (*Connection) SendMessage

func (c *Connection) SendMessage(msg interface{}) error

SendMessage queues a message to be sent to the WebSocket client asynchronously. Returns ErrQueueFull if the message queue is full (client is too slow). Returns ErrSlowClient if the client has been consistently too slow. Returns ErrConnectionClosed if the connection has been closed.

func (*Connection) Subscribe

func (c *Connection) Subscribe(channel string) bool

Subscribe adds a channel subscription for this connection

func (*Connection) Unsubscribe

func (c *Connection) Unsubscribe(channel string)

Unsubscribe removes a channel subscription for this connection

func (*Connection) UpdateAuth

func (c *Connection) UpdateAuth(userID *string, role string, claims map[string]interface{})

UpdateAuth updates the connection's authentication context

type ConnectionEvent

type ConnectionEvent struct {
	Type        ConnectionEventType `json:"type"`
	ID          string              `json:"id"`
	UserID      *string             `json:"user_id"`
	Email       *string             `json:"email"`
	DisplayName *string             `json:"display_name,omitempty"`
	RemoteAddr  string              `json:"remote_addr"`
	ConnectedAt string              `json:"connected_at"` // ISO 8601 format
	Timestamp   string              `json:"timestamp"`    // Event timestamp
}

ConnectionEvent represents a connection lifecycle event This is broadcast to the admin channel when connections are established or terminated

func NewConnectionEvent

func NewConnectionEvent(eventType ConnectionEventType, conn *Connection, email *string, displayName *string) ConnectionEvent

NewConnectionEvent creates a new connection event

func (ConnectionEvent) ToServerMessage

func (e ConnectionEvent) ToServerMessage() ServerMessage

ToServerMessage converts a ConnectionEvent to a ServerMessage for broadcasting

type ConnectionEventType

type ConnectionEventType string

ConnectionEventType represents the type of connection event

const (
	ConnectionEventConnected    ConnectionEventType = "connected"
	ConnectionEventDisconnected ConnectionEventType = "disconnected"
)

type ConnectionInfo

type ConnectionInfo struct {
	ID          string  `json:"id"`
	UserID      *string `json:"user_id"`
	Email       *string `json:"email"`
	DisplayName *string `json:"display_name,omitempty"`
	RemoteAddr  string  `json:"remote_addr"`
	ConnectedAt string  `json:"connected_at"`
}

ConnectionInfo represents detailed information about a connection

type ConnectionQueueStats

type ConnectionQueueStats struct {
	QueueLength     int
	QueueCapacity   int
	QueueHighWater  int32
	MessagesSent    uint64
	MessagesDropped uint64
	SlowClientCount int32
}

ConnectionQueueStats contains statistics about a connection's message queue

type Filter

type Filter struct {
	Column   string
	Operator string
	Value    string
}

Filter represents a realtime subscription filter Format: column=operator.value Examples:

  • created_by=eq.user123
  • priority=gt.5
  • status=in.(queued,running)
  • data->key=eq.value (JSONB path access)
  • data->>key=eq.value (JSONB text access)

func ParseFilter

func ParseFilter(filterStr string) (*Filter, error)

ParseFilter parses a filter string Returns nil if filterStr is empty (no filter)

func (*Filter) Matches

func (f *Filter) Matches(record map[string]interface{}) bool

Matches checks if a record matches this filter Returns true if the filter is nil (no filtering) Returns false if the column doesn't exist in the record

type GlobalBroadcast

type GlobalBroadcast struct {
	Channel  string        `json:"channel"`
	TenantID string        `json:"tenant_id,omitempty"`
	Message  ServerMessage `json:"message"`
}

GlobalBroadcast represents a message broadcast across instances

type Listener

type Listener struct {
	// contains filtered or unexported fields
}

Listener handles PostgreSQL LISTEN/NOTIFY and PubSub log events.

NOTE: This file has substantial duplication with listener_pool.go (PubSub log listeners, processLogEvent, processAllLogsEvent, enrichJobWithETA). Both implement the RealtimeListener interface. The enrichJobWithETA logic has been extracted to listener_helpers.go. A full unification of the remaining duplicated code is deferred to avoid risk.

func NewListener

func NewListener(pool *pgxpool.Pool, handler *RealtimeHandler, subManager *SubscriptionManager, ps pubsub.PubSub) *Listener

NewListener creates a new PostgreSQL listener

func (*Listener) Start

func (l *Listener) Start() error

Start begins listening for PostgreSQL notifications and PubSub log events

func (*Listener) Stop

func (l *Listener) Stop()

Stop stops the listener

type ListenerPool

type ListenerPool struct {
	// contains filtered or unexported fields
}

ListenerPool manages a pool of PostgreSQL LISTEN connections with parallel processing.

NOTE: This file has substantial duplication with listener.go (PubSub log listeners, processLogEvent, processAllLogsEvent, enrichJobWithETA). Both implement the RealtimeListener interface. The enrichJobWithETA logic has been extracted to listener_helpers.go. A full unification of the remaining duplicated code is deferred to avoid risk.

func NewListenerPool

func NewListenerPool(
	pool *pgxpool.Pool,
	handler *RealtimeHandler,
	subManager *SubscriptionManager,
	ps pubsub.PubSub,
	config ListenerPoolConfig,
) *ListenerPool

NewListenerPool creates a new listener pool with the given configuration.

func (*ListenerPool) GetMetrics

func (lp *ListenerPool) GetMetrics() ListenerPoolMetrics

GetMetrics returns current metrics.

func (*ListenerPool) Start

func (lp *ListenerPool) Start() error

Start begins the listener pool.

func (*ListenerPool) Stop

func (lp *ListenerPool) Stop()

Stop gracefully shuts down the listener pool.

type ListenerPoolConfig

type ListenerPoolConfig struct {
	PoolSize      int // Number of LISTEN connections (default: 2)
	WorkerCount   int // Number of workers for processing (default: 4)
	QueueSize     int // Size of notification queue per worker (default: 1000)
	RetryInterval time.Duration
	MaxRetries    int
}

ListenerPoolConfig holds configuration for the listener pool.

func DefaultListenerPoolConfig

func DefaultListenerPoolConfig() ListenerPoolConfig

DefaultListenerPoolConfig returns sensible defaults.

type ListenerPoolMetrics

type ListenerPoolMetrics struct {
	ActiveConnections      int32
	NotificationsReceived  uint64
	NotificationsProcessed uint64
	ConnectionFailures     uint64
	Reconnections          uint64
	QueueLength            int
	QueueCapacity          int
}

ListenerPoolMetrics contains metrics for the listener pool.

type LogSubscription

type LogSubscription struct {
	ID            string
	ConnID        string
	ExecutionID   string
	ExecutionType string // "function", "job", "rpc"
}

LogSubscription represents a subscription to execution logs

type LogSubscriptionConfig

type LogSubscriptionConfig struct {
	ExecutionID string `json:"execution_id"`
	Type        string `json:"type"` // function, job, rpc
}

LogSubscriptionConfig represents the config for subscribe_logs messages

type Manager

type Manager struct {
	// contains filtered or unexported fields
}

Manager manages all WebSocket connections

func NewManager

func NewManager(ctx context.Context) *Manager

NewManager creates a new connection manager

func NewManagerWithConfig

func NewManagerWithConfig(ctx context.Context, config ManagerConfig) *Manager

NewManagerWithConfig creates a new connection manager with full configuration

func (*Manager) AddConnection

func (m *Manager) AddConnection(id string, conn *websocket.Conn, userID *string, role string, claims map[string]interface{}, tenantID string) (*Connection, error)

AddConnection adds a new WebSocket connection Returns nil and ErrMaxConnectionsReached if the connection limit is exceeded Returns nil and ErrMaxUserConnectionsReached if the per-user limit is exceeded Returns nil and ErrMaxIPConnectionsReached if the per-IP limit is exceeded (for anonymous)

func (*Manager) AddConnectionWithIP

func (m *Manager) AddConnectionWithIP(id string, conn *websocket.Conn, userID *string, role string, claims map[string]interface{}, tenantID string, remoteIP string) (*Connection, error)

AddConnectionWithIP adds a new WebSocket connection with explicit IP address This is useful when the IP is already known (e.g., from X-Forwarded-For header)

func (*Manager) BroadcastConnectionEvent

func (m *Manager) BroadcastConnectionEvent(event ConnectionEvent)

BroadcastConnectionEvent broadcasts a connection event to the admin channel This allows admins to monitor connection lifecycle in real-time

func (*Manager) BroadcastGlobal

func (m *Manager) BroadcastGlobal(channel string, tenantID string, message ServerMessage) error

func (*Manager) BroadcastToChannel

func (m *Manager) BroadcastToChannel(channel string, tenantID string, message ServerMessage) int

BroadcastToChannel sends a message to all connections subscribed to a channel

func (*Manager) GetConnectionCount

func (m *Manager) GetConnectionCount() int

GetConnectionCount returns the total number of active connections

func (*Manager) GetConnectionsForStats

func (m *Manager) GetConnectionsForStats() []ConnectionInfo

GetConnectionsForStats returns all connections as ConnectionInfo slice This is used by the stats handler which will enrich with emails

func (*Manager) GetDetailedStats

func (m *Manager) GetDetailedStats() map[string]interface{}

GetDetailedStats returns detailed realtime statistics

func (*Manager) GetIPConnectionCount

func (m *Manager) GetIPConnectionCount(ip string) int

GetIPConnectionCount returns the number of connections for a specific IP

func (*Manager) GetSlowClientsDisconnected

func (m *Manager) GetSlowClientsDisconnected() uint64

GetSlowClientsDisconnected returns the count of slow clients disconnected

func (*Manager) GetUserConnectionCount

func (m *Manager) GetUserConnectionCount(userID string) int

GetUserConnectionCount returns the number of connections for a specific user

func (*Manager) RemoveConnection

func (m *Manager) RemoveConnection(id string)

RemoveConnection removes a WebSocket connection

func (*Manager) SetBaseConfig

func (m *Manager) SetBaseConfig(cfg *config.Config)

SetBaseConfig sets the base config for tenant-specific limit resolution

func (*Manager) SetConnectionLimits

func (m *Manager) SetConnectionLimits(maxPerUser, maxPerIP int)

SetConnectionLimits updates the per-user and per-IP connection limits

func (*Manager) SetMaxConnections

func (m *Manager) SetMaxConnections(max int)

SetMaxConnections sets the maximum number of allowed connections

func (*Manager) SetMetrics

func (m *Manager) SetMetrics(metrics *observability.Metrics)

SetMetrics sets the metrics instance for recording realtime metrics

func (*Manager) SetPubSub

func (m *Manager) SetPubSub(ps pubsub.PubSub)

SetPubSub sets the pub/sub backend for cross-instance broadcasting. If set, BroadcastGlobal will publish messages to the pub/sub channel and this manager will subscribe to receive messages from other instances.

func (*Manager) Shutdown

func (m *Manager) Shutdown()

Shutdown gracefully shuts down the manager

type ManagerConfig

type ManagerConfig struct {
	MaxConnections         int           // Maximum total connections (0 = unlimited)
	MaxConnectionsPerUser  int           // Maximum connections per user (0 = unlimited)
	MaxConnectionsPerIP    int           // Maximum connections per IP for anonymous (0 = unlimited)
	ClientMessageQueueSize int           // Size of per-client message queue for async sending (0 = default)
	SlowClientThreshold    int           // Queue length threshold for slow client detection (default: 100)
	SlowClientTimeout      time.Duration // Duration before disconnecting slow clients (default: 30s)
}

ManagerConfig holds configuration for the connection manager

type MessageType

type MessageType string

MessageType represents the type of WebSocket message

const (
	MessageTypeSubscribe        MessageType = "subscribe"
	MessageTypeUnsubscribe      MessageType = "unsubscribe"
	MessageTypeHeartbeat        MessageType = "heartbeat"
	MessageTypeBroadcast        MessageType = "broadcast"
	MessageTypePresence         MessageType = "presence"
	MessageTypeError            MessageType = "error"
	MessageTypeAck              MessageType = "ack"
	MessageTypeChange           MessageType = "postgres_changes"
	MessageTypeAccessToken      MessageType = "access_token"
	MessageTypeSubscribeLogs    MessageType = "subscribe_logs"     // Subscribe to execution logs
	MessageTypeExecutionLog     MessageType = "execution_log"      // Execution log event from server
	MessageTypeSubscribeAllLogs MessageType = "subscribe_all_logs" // Subscribe to all logs (admin only)
	MessageTypeLogEntry         MessageType = "log_entry"          // Log entry event from server (all categories)
)

type PostgresChangesConfig

type PostgresChangesConfig struct {
	Event  string `json:"event"`            // INSERT, UPDATE, DELETE, or *
	Schema string `json:"schema"`           // Database schema
	Table  string `json:"table"`            // Table name
	Filter string `json:"filter,omitempty"` // Optional filter: column=operator.value
}

PostgresChangesConfig represents the config object in postgres_changes subscriptions

type PresenceInfo

type PresenceInfo struct {
	Key      string
	State    PresenceState
	UserID   *string
	ConnID   string
	TenantID string
	JoinedAt time.Time
}

PresenceInfo tracks presence information for a user/key

type PresenceManager

type PresenceManager struct {
	// contains filtered or unexported fields
}

PresenceManager manages presence tracking across channels

func NewPresenceManager

func NewPresenceManager() *PresenceManager

NewPresenceManager creates a new presence manager

func (*PresenceManager) CleanupConnection

func (pm *PresenceManager) CleanupConnection(connID string) map[string]*PresenceInfo

CleanupConnection removes all presence entries for a disconnected connection Returns a map of channel -> presence info that was removed

func (*PresenceManager) GetChannelPresenceCount

func (pm *PresenceManager) GetChannelPresenceCount(channel string) int

GetChannelPresenceCount returns the number of active presences in a channel

func (*PresenceManager) GetPresenceState

func (pm *PresenceManager) GetPresenceState(channel string) map[string][]PresenceState

GetPresenceState returns the current presence state for a channel Returns in the format expected by the SDK: map[key][]PresenceState

func (*PresenceManager) Track

func (pm *PresenceManager) Track(channel, key string, state PresenceState, userID *string, connID string, tenantID string) (*PresenceInfo, bool)

Track adds or updates presence for a given channel and key

func (*PresenceManager) Untrack

func (pm *PresenceManager) Untrack(channel, key, connID string) *PresenceInfo

Untrack removes presence for a given channel and key

type PresenceState

type PresenceState map[string]interface{}

PresenceState represents the state data for a presence

type RLSCacheConfig

type RLSCacheConfig struct {
	MaxSize int           // Maximum number of entries (0 = use default)
	TTL     time.Duration // Cache entry TTL (0 = use default)
}

RLSCacheConfig holds configuration for the RLS cache

type RealtimeHandler

type RealtimeHandler struct {
	// contains filtered or unexported fields
}

RealtimeHandler handles WebSocket connections

func NewRealtimeHandler

func NewRealtimeHandler(manager *Manager, authService AuthService, subManager *SubscriptionManager) *RealtimeHandler

NewRealtimeHandler creates a new realtime handler

func (*RealtimeHandler) GetDetailedStats

func (h *RealtimeHandler) GetDetailedStats() map[string]interface{}

GetDetailedStats returns detailed realtime statistics

func (*RealtimeHandler) GetManager

func (h *RealtimeHandler) GetManager() *Manager

GetManager returns the realtime manager

func (*RealtimeHandler) GetStats

func (h *RealtimeHandler) GetStats() map[string]interface{}

GetStats returns realtime statistics

func (*RealtimeHandler) HandleWebSocket

func (h *RealtimeHandler) HandleWebSocket(c fiber.Ctx) error

HandleWebSocket handles WebSocket upgrade and communication

type RealtimeListener

type RealtimeListener interface {
	Start() error
	Stop()
}

RealtimeListener is the interface for PostgreSQL LISTEN/NOTIFY handlers. Both the simple Listener and the pooled ListenerPool implement this interface.

type ServerMessage

type ServerMessage struct {
	Type    MessageType `json:"type"`
	Channel string      `json:"channel,omitempty"`
	Payload interface{} `json:"payload,omitempty"`
	Error   string      `json:"error,omitempty"`
}

ServerMessage represents a message to the client

type Subscription

type Subscription struct {
	ID     string
	UserID string
	Role   string
	Claims map[string]interface{} // Full JWT claims for RLS (includes custom claims like meeting_id, player_id)
	Table  string
	Schema string
	Event  string  // INSERT, UPDATE, DELETE, or * for all
	Filter *Filter // Filter (column=operator.value)
	ConnID string  // Connection ID this subscription belongs to
}

Subscription represents an RLS-aware subscription to table changes

type SubscriptionDB

type SubscriptionDB interface {
	// IsTableRealtimeEnabled checks if a table is enabled for realtime in the schema registry.
	IsTableRealtimeEnabled(ctx context.Context, schema, table string) (bool, error)
	// CheckRLSAccess verifies if a user can access a record based on RLS policies.
	// The claims map contains the full JWT claims to be passed to PostgreSQL for RLS evaluation.
	CheckRLSAccess(ctx context.Context, schema, table, role string, claims map[string]interface{}, recordID interface{}) (bool, error)
	// CheckRPCOwnership checks if a user owns an RPC execution.
	CheckRPCOwnership(ctx context.Context, execID, userID uuid.UUID) (isOwner bool, exists bool, err error)
	// CheckJobOwnership checks if a user owns a job execution.
	CheckJobOwnership(ctx context.Context, execID, userID uuid.UUID) (isOwner bool, exists bool, err error)
	// CheckFunctionOwnership checks if a user owns a function execution.
	CheckFunctionOwnership(ctx context.Context, execID, userID uuid.UUID) (isOwner bool, exists bool, err error)
}

SubscriptionDB defines the database operations needed by SubscriptionManager. This interface allows for easier testing with mocks.

func NewPgxSubscriptionDB

func NewPgxSubscriptionDB(pool *pgxpool.Pool) SubscriptionDB

NewPgxSubscriptionDB creates a SubscriptionDB backed by a pgx pool.

type SubscriptionManager

type SubscriptionManager struct {
	// contains filtered or unexported fields
}

SubscriptionManager manages RLS-aware subscriptions

func NewSubscriptionManager

func NewSubscriptionManager(db SubscriptionDB) *SubscriptionManager

NewSubscriptionManager creates a new subscription manager with default RLS cache settings. For production use, pass NewPgxSubscriptionDB(pool). For testing, pass a mock implementation of SubscriptionDB.

func NewSubscriptionManagerWithConfig

func NewSubscriptionManagerWithConfig(db SubscriptionDB, cacheConfig RLSCacheConfig) *SubscriptionManager

NewSubscriptionManagerWithConfig creates a new subscription manager with custom RLS cache settings.

func (*SubscriptionManager) CheckExecutionOwnership

func (sm *SubscriptionManager) CheckExecutionOwnership(ctx context.Context, executionID, userID, executionType string) (isOwner bool, exists bool, err error)

CheckExecutionOwnership verifies if a user owns the execution. Returns (isOwner, exists, error). executionType can be "rpc", "job", "function", or empty (will try all).

func (*SubscriptionManager) Close

func (sm *SubscriptionManager) Close()

func (*SubscriptionManager) CreateAllLogsSubscription

func (sm *SubscriptionManager) CreateAllLogsSubscription(subID, connID, category string, levels []string) (*AllLogsSubscription, error)

CreateAllLogsSubscription creates a subscription for all logs (admin only)

func (*SubscriptionManager) CreateLogSubscription

func (sm *SubscriptionManager) CreateLogSubscription(subID, connID, executionID, executionType string) (*LogSubscription, error)

CreateLogSubscription creates a subscription for execution logs

func (*SubscriptionManager) CreateSubscription

func (sm *SubscriptionManager) CreateSubscription(
	subID string,
	connID string,
	userID string,
	role string,
	claims map[string]interface{},
	schema string,
	table string,
	event string,
	filterStr string,
) (*Subscription, error)

CreateSubscription creates a new RLS-aware subscription

func (*SubscriptionManager) FilterEventForSubscribers

func (sm *SubscriptionManager) FilterEventForSubscribers(ctx context.Context, event *ChangeEvent) map[string]*ChangeEvent

FilterEventForSubscribers filters a change event for all subscribers with RLS

func (*SubscriptionManager) GetAllLogsSubscribers

func (sm *SubscriptionManager) GetAllLogsSubscribers() map[string]*AllLogsSubscription

GetAllLogsSubscribers returns all connection IDs subscribed to all logs, along with their filter preferences

func (*SubscriptionManager) GetLogSubscribers

func (sm *SubscriptionManager) GetLogSubscribers(executionID string) []string

GetLogSubscribers returns all connection IDs subscribed to an execution's logs

func (*SubscriptionManager) GetLogSubscriptionsByConnection

func (sm *SubscriptionManager) GetLogSubscriptionsByConnection(connID string) []*LogSubscription

GetLogSubscriptionsByConnection returns all log subscriptions for a connection

func (*SubscriptionManager) GetStats

func (sm *SubscriptionManager) GetStats() map[string]interface{}

GetStats returns subscription statistics

func (*SubscriptionManager) GetSubscriptionsByConnection

func (sm *SubscriptionManager) GetSubscriptionsByConnection(connID string) []*Subscription

GetSubscriptionsByConnection returns all subscriptions for a connection

func (*SubscriptionManager) RemoveAllLogsSubscription

func (sm *SubscriptionManager) RemoveAllLogsSubscription(subID string) error

RemoveAllLogsSubscription removes an all-logs subscription

func (*SubscriptionManager) RemoveConnectionAllLogsSubscriptions

func (sm *SubscriptionManager) RemoveConnectionAllLogsSubscriptions(connID string)

RemoveConnectionAllLogsSubscriptions removes all all-logs subscriptions for a connection

func (*SubscriptionManager) RemoveConnectionLogSubscriptions

func (sm *SubscriptionManager) RemoveConnectionLogSubscriptions(connID string)

RemoveConnectionLogSubscriptions removes all log subscriptions for a connection

func (*SubscriptionManager) RemoveConnectionSubscriptions

func (sm *SubscriptionManager) RemoveConnectionSubscriptions(connID string)

RemoveConnectionSubscriptions removes all subscriptions for a connection

func (*SubscriptionManager) RemoveLogSubscription

func (sm *SubscriptionManager) RemoveLogSubscription(subID string) error

RemoveLogSubscription removes an execution log subscription

func (*SubscriptionManager) RemoveSubscription

func (sm *SubscriptionManager) RemoveSubscription(subID string) error

RemoveSubscription removes a subscription

func (*SubscriptionManager) UpdateConnectionClaims

func (sm *SubscriptionManager) UpdateConnectionClaims(connID string, newClaims map[string]interface{})

UpdateConnectionClaims updates the claims for all subscriptions belonging to a connection

func (*SubscriptionManager) UpdateConnectionRole

func (sm *SubscriptionManager) UpdateConnectionRole(connID string, newRole string)

UpdateConnectionRole updates the role for all subscriptions belonging to a connection

type TokenClaims

type TokenClaims struct {
	UserID    string
	Email     string
	Role      string
	SessionID string
	RawClaims map[string]interface{} // Full claims map for RLS (includes custom claims like meeting_id, player_id)
}

TokenClaims represents JWT claims

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL