internal

package
v0.1.6 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Oct 28, 2025 License: MIT Imports: 5 Imported by: 0

Documentation

Index

Constants

View Source
const (
	// PatternExact matches exact channel name
	PatternExact = "exact"

	// PatternWildcard allows * wildcards (e.g., "user.*.events")
	PatternWildcard = "wildcard"

	// PatternPrefix matches channel name prefix (e.g., "system.")
	PatternPrefix = "prefix"

	// PatternSuffix matches channel name suffix (e.g., ".events")
	PatternSuffix = "suffix"
)

Common channel patterns

View Source
const (
	NodeEventJoined  = "joined"
	NodeEventLeft    = "left"
	NodeEventUpdated = "updated"
)

Node change event types

View Source
const (
	ActivityTypePlaying   = "playing"
	ActivityTypeListening = "listening"
	ActivityTypeWatching  = "watching"
	ActivityTypeStreaming = "streaming"
	ActivityTypeCustom    = "custom"
)

Activity types

View Source
const (
	RoleOwner  = "owner"
	RoleAdmin  = "admin"
	RoleMember = "member"
	RoleGuest  = "guest"
)

Common room roles

View Source
const (
	PermissionSendMessage   = "send_message"
	PermissionDeleteMessage = "delete_message"
	PermissionInviteMembers = "invite_members"
	PermissionRemoveMembers = "remove_members"
	PermissionManageRoom    = "manage_room"
	PermissionManageRoles   = "manage_roles"
)

Common permissions

View Source
const (
	RoomEventCreated      = "created"
	RoomEventUpdated      = "updated"
	RoomEventDeleted      = "deleted"
	RoomEventMemberJoined = "member_joined"
	RoomEventMemberLeft   = "member_left"
	RoomEventMemberKicked = "member_kicked"
	RoomEventMemberBanned = "member_banned"
)

Room event types

View Source
const (
	ModerationEventBan      = "ban"
	ModerationEventUnban    = "unban"
	ModerationEventMute     = "mute"
	ModerationEventUnmute   = "unmute"
	ModerationEventKick     = "kick"
	ModerationEventWarn     = "warn"
	ModerationEventSlowMode = "slow_mode"
	ModerationEventLock     = "lock"
	ModerationEventUnlock   = "unlock"
)

Moderation event types

View Source
const (
	MessageTypeMessage  = "message"
	MessageTypePresence = "presence"
	MessageTypeTyping   = "typing"
	MessageTypeSystem   = "system"
	MessageTypeJoin     = "join"
	MessageTypeLeave    = "leave"
	MessageTypeError    = "error"
)

Message types

View Source
const (
	StatusOnline  = "online"
	StatusAway    = "away"
	StatusBusy    = "busy"
	StatusOffline = "offline"
)

Status constants

View Source
const (
	TypingEventStarted = "started"
	TypingEventStopped = "stopped"
)

Typing event types

Variables

View Source
var (
	// Connection errors
	ErrConnectionNotFound     = errors.New("connection not found")
	ErrConnectionClosed       = errors.New("connection closed")
	ErrConnectionLimitReached = errors.New("connection limit reached")
	ErrInvalidConnection      = errors.New("invalid connection")

	// Room errors
	ErrRoomNotFound      = errors.New("room not found")
	ErrRoomAlreadyExists = errors.New("room already exists")
	ErrRoomFull          = errors.New("room is full")
	ErrNotRoomMember     = errors.New("not a member of the room")
	ErrAlreadyRoomMember = errors.New("already a member of the room")
	ErrInvalidRoom       = errors.New("invalid room")
	ErrRoomLimitReached  = errors.New("room limit reached")

	// Channel errors
	ErrChannelNotFound      = errors.New("channel not found")
	ErrChannelAlreadyExists = errors.New("channel already exists")
	ErrNotSubscribed        = errors.New("not subscribed to channel")
	ErrAlreadySubscribed    = errors.New("already subscribed to channel")
	ErrInvalidChannel       = errors.New("invalid channel")

	// Permission errors
	ErrPermissionDenied  = errors.New("permission denied")
	ErrInvalidPermission = errors.New("invalid permission")
	ErrInsufficientRole  = errors.New("insufficient role")

	// Message errors
	ErrMessageTooLarge = errors.New("message too large")
	ErrInvalidMessage  = errors.New("invalid message")
	ErrMessageNotFound = errors.New("message not found")

	// Presence errors
	ErrPresenceNotFound = errors.New("presence not found")
	ErrInvalidStatus    = errors.New("invalid status")

	// Invite errors
	ErrInviteNotFound = errors.New("invite not found")
	ErrInviteExpired  = errors.New("invite expired")

	// Backend errors
	ErrBackendNotConnected = errors.New("backend not connected")
	ErrBackendTimeout      = errors.New("backend operation timeout")
	ErrBackendUnavailable  = errors.New("backend unavailable")

	// Configuration errors
	ErrInvalidConfig = errors.New("invalid configuration")
	ErrMissingConfig = errors.New("missing required configuration")

	// Distributed errors
	ErrNodeNotFound          = errors.New("node not found")
	ErrLockAcquisitionFailed = errors.New("failed to acquire lock")
	ErrLockNotHeld           = errors.New("lock not held")
)

Common errors

Functions

func NewBackendError

func NewBackendError(backend, op string, err error) error

func NewChannelError

func NewChannelError(channelID, op string, err error) error

func NewConnectionError

func NewConnectionError(connID, op string, err error) error

func NewMessageError

func NewMessageError(messageID, op string, err error) error

func NewRoomError

func NewRoomError(roomID, userID, op string, err error) error

Types

type ActivityInfo

type ActivityInfo struct {
	Type      string         `json:"type"` // "playing", "listening", "watching", "streaming", "custom"
	Name      string         `json:"name"`
	Details   string         `json:"details,omitempty"`
	State     string         `json:"state,omitempty"`
	StartTime time.Time      `json:"start_time,omitempty"`
	EndTime   *time.Time     `json:"end_time,omitempty"`
	Assets    map[string]any `json:"assets,omitempty"` // Images, icons, etc.
	Metadata  map[string]any `json:"metadata,omitempty"`
}

ActivityInfo represents rich activity/presence information (Discord-style).

type AnalyticsEvent

type AnalyticsEvent struct {
	Type      string         `json:"type"`
	UserID    string         `json:"user_id,omitempty"`
	RoomID    string         `json:"room_id,omitempty"`
	Data      map[string]any `json:"data,omitempty"`
	Timestamp time.Time      `json:"timestamp"`
}

AnalyticsEvent represents an event for analytics tracking.

type AnalyticsQuery

type AnalyticsQuery struct {
	Metric    string         `json:"metric"`
	StartTime time.Time      `json:"start_time"`
	EndTime   time.Time      `json:"end_time"`
	GroupBy   string         `json:"group_by,omitempty"`
	Filters   map[string]any `json:"filters,omitempty"`
	Limit     int            `json:"limit,omitempty"`
}

AnalyticsQuery represents a query for analytics data.

type AnalyticsResult

type AnalyticsResult struct {
	Metric    string            `json:"metric"`
	Value     float64           `json:"value"`
	Timestamp time.Time         `json:"timestamp"`
	Labels    map[string]string `json:"labels,omitempty"`
	Metadata  map[string]any    `json:"metadata,omitempty"`
}

AnalyticsResult represents the result of an analytics query.

type Availability

type Availability struct {
	Available bool      `json:"available"`
	Message   string    `json:"message,omitempty"`
	UpdatedAt time.Time `json:"updated_at"`
}

Availability represents user availability status.

type BackendError

type BackendError struct {
	Backend string
	Op      string
	Err     error
}

BackendError wraps backend-related errors with context.

func (*BackendError) Error

func (e *BackendError) Error() string

func (*BackendError) Unwrap

func (e *BackendError) Unwrap() error

type Channel

type Channel interface {
	// Identity
	GetID() string
	GetName() string
	GetCreated() time.Time
	GetMessageCount() int64

	// Subscription management
	Subscribe(ctx context.Context, sub Subscription) error
	Unsubscribe(ctx context.Context, connID string) error
	GetSubscribers(ctx context.Context) ([]Subscription, error)
	GetSubscriberCount(ctx context.Context) (int, error)
	IsSubscribed(ctx context.Context, connID string) (bool, error)

	// Publishing
	Publish(ctx context.Context, message *Message) error

	// Lifecycle
	Delete(ctx context.Context) error
}

Channel represents a pub/sub topic for message broadcasting.

type ChannelError

type ChannelError struct {
	ChannelID string
	Op        string
	Err       error
}

ChannelError wraps channel-related errors with context.

func (*ChannelError) Error

func (e *ChannelError) Error() string

func (*ChannelError) Unwrap

func (e *ChannelError) Unwrap() error

type ChannelOptions

type ChannelOptions struct {
	ID          string         `json:"id,omitempty"`
	Name        string         `json:"name"`
	Description string         `json:"description,omitempty"`
	Private     bool           `json:"private"`
	Persistent  bool           `json:"persistent"` // Persist messages for late subscribers
	Metadata    map[string]any `json:"metadata,omitempty"`
}

ChannelOptions contains configuration for creating a channel.

type ChannelPattern

type ChannelPattern interface {
	// Match checks if a channel name matches the pattern
	Match(channelName string) bool

	// GetPattern returns the pattern string
	GetPattern() string
}

ChannelPattern represents pattern-matching for channel subscriptions.

type ChannelStore

type ChannelStore interface {
	// CRUD operations
	Create(ctx context.Context, channel Channel) error
	Get(ctx context.Context, channelID string) (Channel, error)
	Delete(ctx context.Context, channelID string) error
	List(ctx context.Context) ([]Channel, error)
	Exists(ctx context.Context, channelID string) (bool, error)

	// Subscription operations
	AddSubscription(ctx context.Context, channelID string, sub Subscription) error
	RemoveSubscription(ctx context.Context, channelID, connID string) error
	GetSubscriptions(ctx context.Context, channelID string) ([]Subscription, error)
	GetSubscriberCount(ctx context.Context, channelID string) (int, error)
	IsSubscribed(ctx context.Context, channelID, connID string) (bool, error)

	// Publishing (for distributed backends)
	Publish(ctx context.Context, channelID string, message *Message) error

	// User's channels
	GetUserChannels(ctx context.Context, userID string) ([]Channel, error)

	// Lifecycle
	Connect(ctx context.Context) error
	Disconnect(ctx context.Context) error
	Ping(ctx context.Context) error
}

ChannelStore provides backend storage for channels.

type ClusterStats

type ClusterStats struct {
	TotalNodes        int            `json:"total_nodes"`
	ActiveNodes       int            `json:"active_nodes"`
	TotalConnections  int            `json:"total_connections"`
	TotalRooms        int            `json:"total_rooms"`
	TotalChannels     int            `json:"total_channels"`
	MessagesPerSecond float64        `json:"messages_per_second"`
	Nodes             []NodeInfo     `json:"nodes"`
	Uptime            time.Duration  `json:"uptime"`
	Extra             map[string]any `json:"extra,omitempty"`
}

ClusterStats provides statistics about the streaming cluster.

type Config

type Config struct {
	// Backend configuration
	Backend         string   `json:"backend" yaml:"backend"`                   // "local", "redis", "nats"
	BackendURLs     []string `json:"backend_urls" yaml:"backend_urls"`         // Connection URLs
	BackendUsername string   `json:"backend_username" yaml:"backend_username"` // Authentication username
	BackendPassword string   `json:"backend_password" yaml:"backend_password"` // Authentication password

	// Feature toggles
	EnableRooms            bool `json:"enable_rooms" yaml:"enable_rooms"`
	EnableChannels         bool `json:"enable_channels" yaml:"enable_channels"`
	EnablePresence         bool `json:"enable_presence" yaml:"enable_presence"`
	EnableTypingIndicators bool `json:"enable_typing_indicators" yaml:"enable_typing_indicators"`
	EnableMessageHistory   bool `json:"enable_message_history" yaml:"enable_message_history"`
	EnableDistributed      bool `json:"enable_distributed" yaml:"enable_distributed"`

	// Connection limits
	MaxConnectionsPerUser int `json:"max_connections_per_user" yaml:"max_connections_per_user"`
	MaxRoomsPerUser       int `json:"max_rooms_per_user" yaml:"max_rooms_per_user"`
	MaxChannelsPerUser    int `json:"max_channels_per_user" yaml:"max_channels_per_user"`
	MaxMessageSize        int `json:"max_message_size" yaml:"max_message_size"` // Bytes
	MaxMessagesPerSecond  int `json:"max_messages_per_second" yaml:"max_messages_per_second"`

	// Timeouts
	PingInterval    time.Duration `json:"ping_interval" yaml:"ping_interval"`
	PongTimeout     time.Duration `json:"pong_timeout" yaml:"pong_timeout"`
	WriteTimeout    time.Duration `json:"write_timeout" yaml:"write_timeout"`
	ReadBufferSize  int           `json:"read_buffer_size" yaml:"read_buffer_size"`
	WriteBufferSize int           `json:"write_buffer_size" yaml:"write_buffer_size"`

	// Message persistence
	MessageRetention   time.Duration `json:"message_retention" yaml:"message_retention"`
	MessageCleanup     time.Duration `json:"message_cleanup" yaml:"message_cleanup"`
	MaxMessagesPerRoom int64         `json:"max_messages_per_room" yaml:"max_messages_per_room"`

	// Presence settings
	PresenceTimeout time.Duration `json:"presence_timeout" yaml:"presence_timeout"`
	PresenceCleanup time.Duration `json:"presence_cleanup" yaml:"presence_cleanup"`

	// Typing settings
	TypingTimeout         time.Duration `json:"typing_timeout" yaml:"typing_timeout"`
	TypingCleanup         time.Duration `json:"typing_cleanup" yaml:"typing_cleanup"`
	MaxTypingUsersPerRoom int           `json:"max_typing_users_per_room" yaml:"max_typing_users_per_room"`

	// Distributed settings
	NodeID            string        `json:"node_id" yaml:"node_id"`
	HeartbeatInterval time.Duration `json:"heartbeat_interval" yaml:"heartbeat_interval"`
	NodeTimeout       time.Duration `json:"node_timeout" yaml:"node_timeout"`

	// TLS
	TLSEnabled  bool   `json:"tls_enabled" yaml:"tls_enabled"`
	TLSCertFile string `json:"tls_cert_file" yaml:"tls_cert_file"`
	TLSKeyFile  string `json:"tls_key_file" yaml:"tls_key_file"`
	TLSCAFile   string `json:"tls_ca_file" yaml:"tls_ca_file"`

	// Misc
	RequireConfig bool `json:"-" yaml:"-"` // Internal flag for config loading
}

Config contains all configuration for the streaming extension.

func DefaultConfig

func DefaultConfig() Config

DefaultConfig returns default configuration.

func (*Config) Validate

func (c *Config) Validate() error

Validate validates the configuration.

type ConfigOption

type ConfigOption func(*Config)

ConfigOption is a functional option for Config.

func WithAuthentication

func WithAuthentication(username, password string) ConfigOption

WithAuthentication sets backend authentication credentials.

func WithBackend

func WithBackend(backend string) ConfigOption

WithBackend sets the backend type.

func WithBackendURLs

func WithBackendURLs(urls ...string) ConfigOption

WithBackendURLs sets the backend connection URLs.

func WithBufferSizes

func WithBufferSizes(read, write int) ConfigOption

WithBufferSizes sets buffer sizes.

func WithConfig

func WithConfig(config Config) ConfigOption

WithConfig sets the complete configuration.

func WithConnectionLimits

func WithConnectionLimits(perUser, roomsPerUser, channelsPerUser int) ConfigOption

WithConnectionLimits sets connection limits.

func WithFeatures

func WithFeatures(rooms, channels, presence, typing, history bool) ConfigOption

WithFeatures enables/disables features.

func WithLocalBackend

func WithLocalBackend() ConfigOption

WithLocalBackend configures local in-memory backend.

func WithMessageLimits

func WithMessageLimits(maxSize, maxPerSecond int) ConfigOption

WithMessageLimits sets message limits.

func WithMessageRetention

func WithMessageRetention(retention time.Duration) ConfigOption

WithMessageRetention sets message retention period.

func WithNATSBackend

func WithNATSBackend(urls ...string) ConfigOption

WithNATSBackend configures NATS backend.

func WithNodeID

func WithNodeID(nodeID string) ConfigOption

WithNodeID sets the node ID for distributed mode.

func WithPresenceTimeout

func WithPresenceTimeout(timeout time.Duration) ConfigOption

WithPresenceTimeout sets presence timeout.

func WithRedisBackend

func WithRedisBackend(url string) ConfigOption

WithRedisBackend configures Redis backend.

func WithRequireConfig

func WithRequireConfig(require bool) ConfigOption

WithRequireConfig requires configuration from ConfigManager.

func WithTLS

func WithTLS(certFile, keyFile, caFile string) ConfigOption

WithTLS enables TLS with certificate files.

func WithTimeouts

func WithTimeouts(ping, pong, write time.Duration) ConfigOption

WithTimeouts sets connection timeouts.

func WithTypingTimeout

func WithTypingTimeout(timeout time.Duration) ConfigOption

WithTypingTimeout sets typing indicator timeout.

type ConnectionError

type ConnectionError struct {
	ConnID string
	Op     string
	Err    error
}

ConnectionError wraps connection-related errors with context.

func (*ConnectionError) Error

func (e *ConnectionError) Error() string

func (*ConnectionError) Unwrap

func (e *ConnectionError) Unwrap() error

type ConnectionInfo

type ConnectionInfo struct {
	ID            string         `json:"id"`
	UserID        string         `json:"user_id"`
	SessionID     string         `json:"session_id"`
	ConnectedAt   time.Time      `json:"connected_at"`
	LastActivity  time.Time      `json:"last_activity"`
	RoomsJoined   []string       `json:"rooms_joined"`
	Subscriptions []string       `json:"subscriptions"`
	IPAddress     string         `json:"ip_address,omitempty"`
	UserAgent     string         `json:"user_agent,omitempty"`
	Metadata      map[string]any `json:"metadata,omitempty"`
}

ConnectionInfo provides detailed information about a connection.

type DeviceInfo

type DeviceInfo struct {
	DeviceID string         `json:"device_id"`
	Type     string         `json:"type"` // "mobile", "desktop", "tablet", "web"
	OS       string         `json:"os,omitempty"`
	Browser  string         `json:"browser,omitempty"`
	LastSeen time.Time      `json:"last_seen"`
	IP       string         `json:"ip,omitempty"`
	Active   bool           `json:"active"`
	Metadata map[string]any `json:"metadata,omitempty"`
}

DeviceInfo represents information about a user's device.

type DistributedBackend

type DistributedBackend interface {
	// Node coordination
	RegisterNode(ctx context.Context, nodeID string, metadata map[string]any) error
	UnregisterNode(ctx context.Context, nodeID string) error
	GetNodes(ctx context.Context) ([]NodeInfo, error)
	GetNode(ctx context.Context, nodeID string) (*NodeInfo, error)
	Heartbeat(ctx context.Context, nodeID string) error

	// Message broadcasting across nodes
	Publish(ctx context.Context, channel string, message *Message) error
	Subscribe(ctx context.Context, channel string, handler MessageHandler) error
	Unsubscribe(ctx context.Context, channel string) error

	// Distributed presence
	SetPresence(ctx context.Context, userID, status string, ttl time.Duration) error
	GetPresence(ctx context.Context, userID string) (*UserPresence, error)
	GetOnlineUsers(ctx context.Context) ([]string, error)

	// Distributed locks (for exclusive operations)
	AcquireLock(ctx context.Context, key string, ttl time.Duration) (Lock, error)
	ReleaseLock(ctx context.Context, lock Lock) error

	// Distributed counters
	Increment(ctx context.Context, key string) (int64, error)
	Decrement(ctx context.Context, key string) (int64, error)
	GetCounter(ctx context.Context, key string) (int64, error)

	// Node discovery
	DiscoverNodes(ctx context.Context) ([]NodeInfo, error)
	WatchNodes(ctx context.Context, handler NodeChangeHandler) error

	// Health check
	Ping(ctx context.Context) error

	// Lifecycle
	Connect(ctx context.Context) error
	Disconnect(ctx context.Context) error
}

DistributedBackend provides cross-node coordination for distributed streaming.

type DistributedBackendOptions

type DistributedBackendOptions struct {
	// Backend type: "redis", "nats", ""
	Type string

	// Connection URLs
	URLs []string

	// Node ID (unique identifier for this instance)
	NodeID string

	// Heartbeat interval
	HeartbeatInterval time.Duration

	// Node timeout (consider dead after this)
	NodeTimeout time.Duration

	// Reconnect settings
	MaxReconnectAttempts int
	ReconnectBackoff     time.Duration

	// TLS configuration
	TLSEnabled  bool
	TLSCertFile string
	TLSKeyFile  string
	TLSCAFile   string

	// Authentication
	Username string
	Password string

	// Additional options
	Options map[string]any
}

DistributedBackendOptions contains configuration for distributed backend.

func DefaultDistributedBackendOptions

func DefaultDistributedBackendOptions() DistributedBackendOptions

DefaultDistributedBackendOptions returns default distributed backend options.

type EnhancedConnection

type EnhancedConnection interface {
	forge.Connection

	// Metadata
	GetUserID() string
	SetUserID(userID string)
	GetSessionID() string
	SetSessionID(sessionID string)
	GetMetadata(key string) (any, bool)
	SetMetadata(key string, value any)

	// Tracking
	GetJoinedRooms() []string
	AddRoom(roomID string)
	RemoveRoom(roomID string)
	IsInRoom(roomID string) bool

	GetSubscriptions() []string
	AddSubscription(channelID string)
	RemoveSubscription(channelID string)
	IsSubscribed(channelID string) bool

	GetLastActivity() time.Time
	UpdateActivity()

	// State
	IsClosed() bool
	MarkClosed()
}

EnhancedConnection wraps the core forge.Connection with streaming metadata.

type FileInfo

type FileInfo struct {
	ID           string         `json:"id"`
	Filename     string         `json:"filename"`
	ContentType  string         `json:"content_type"`
	Size         int64          `json:"size"`
	UserID       string         `json:"user_id"`
	RoomID       string         `json:"room_id,omitempty"`
	URL          string         `json:"url"`
	ThumbnailURL string         `json:"thumbnail_url,omitempty"`
	Metadata     map[string]any `json:"metadata,omitempty"`
	UploadedAt   time.Time      `json:"uploaded_at"`
	ExpiresAt    *time.Time     `json:"expires_at,omitempty"`
}

FileInfo represents information about an uploaded file.

type FileQuery

type FileQuery struct {
	UserID      string         `json:"user_id,omitempty"`
	RoomID      string         `json:"room_id,omitempty"`
	ContentType string         `json:"content_type,omitempty"`
	Before      *time.Time     `json:"before,omitempty"`
	After       *time.Time     `json:"after,omitempty"`
	Limit       int            `json:"limit,omitempty"`
	Offset      int            `json:"offset,omitempty"`
	Filters     map[string]any `json:"filters,omitempty"`
}

FileQuery represents a query for searching files.

type FileUpload

type FileUpload struct {
	ID          string         `json:"id"`
	Filename    string         `json:"filename"`
	ContentType string         `json:"content_type"`
	Size        int64          `json:"size"`
	UserID      string         `json:"user_id"`
	RoomID      string         `json:"room_id,omitempty"`
	Data        []byte         `json:"data,omitempty"`
	URL         string         `json:"url,omitempty"`
	Metadata    map[string]any `json:"metadata,omitempty"`
	UploadedAt  time.Time      `json:"uploaded_at"`
}

FileUpload represents a file upload request.

type HistoryQuery

type HistoryQuery struct {
	Limit      int       // Maximum number of messages
	Before     time.Time // Messages before this timestamp
	After      time.Time // Messages after this timestamp
	ThreadID   string    // Filter by thread
	UserID     string    // Filter by user
	SearchTerm string    // Full-text search
}

HistoryQuery defines parameters for retrieving message history.

type Invite

type Invite struct {
	Code      string         `json:"code"`
	RoomID    string         `json:"room_id"`
	CreatedBy string         `json:"created_by"`
	CreatedAt time.Time      `json:"created_at"`
	ExpiresAt *time.Time     `json:"expires_at,omitempty"`
	MaxUses   int            `json:"max_uses"`
	UsedCount int            `json:"used_count"`
	Metadata  map[string]any `json:"metadata,omitempty"`
}

Invite represents a room invitation.

type InviteOptions

type InviteOptions struct {
	Duration time.Duration  `json:"duration,omitempty"`
	MaxUses  int            `json:"max_uses,omitempty"`
	Role     string         `json:"role,omitempty"`
	Metadata map[string]any `json:"metadata,omitempty"`
}

InviteOptions contains configuration for creating a room invitation.

type Lock

type Lock interface {
	GetKey() string
	GetOwner() string
	GetExpiry() time.Time
	Renew(ctx context.Context, ttl time.Duration) error
	IsHeld() bool
}

Lock represents a distributed lock.

type Manager

type Manager interface {
	// Connection management
	Register(conn EnhancedConnection) error
	Unregister(connID string) error
	GetConnection(connID string) (EnhancedConnection, error)
	GetUserConnections(userID string) []EnhancedConnection
	GetAllConnections() []EnhancedConnection
	ConnectionCount() int

	// Advanced Connection Management
	GetConnectionsByStatus(status string) []EnhancedConnection
	KickConnection(ctx context.Context, connID string, reason string) error
	GetConnectionInfo(connID string) (*ConnectionInfo, error)
	GetIdleConnections(idleFor time.Duration) []EnhancedConnection
	CleanupIdleConnections(ctx context.Context, idleFor time.Duration) (int, error)

	// Room operations
	CreateRoom(ctx context.Context, room Room) error
	GetRoom(ctx context.Context, roomID string) (Room, error)
	DeleteRoom(ctx context.Context, roomID string) error
	JoinRoom(ctx context.Context, connID, roomID string) error
	LeaveRoom(ctx context.Context, connID, roomID string) error
	GetRoomMembers(ctx context.Context, roomID string) ([]Member, error)
	ListRooms(ctx context.Context) ([]Room, error)

	// Room Management Extended
	UpdateRoom(ctx context.Context, roomID string, updates map[string]any) error
	SearchRooms(ctx context.Context, query string, filters map[string]any) ([]Room, error)
	GetPublicRooms(ctx context.Context, limit int) ([]Room, error)
	GetUserRoomCount(ctx context.Context, userID string) (int, error)
	ArchiveRoom(ctx context.Context, roomID string) error
	RestoreRoom(ctx context.Context, roomID string) error
	TransferRoomOwnership(ctx context.Context, roomID, newOwnerID string) error

	// Channel operations
	CreateChannel(ctx context.Context, channel Channel) error
	GetChannel(ctx context.Context, channelID string) (Channel, error)
	DeleteChannel(ctx context.Context, channelID string) error
	Subscribe(ctx context.Context, connID, channelID string, filters map[string]any) error
	Unsubscribe(ctx context.Context, connID, channelID string) error
	ListChannels(ctx context.Context) ([]Channel, error)

	// Channel Management Extended
	UpdateChannel(ctx context.Context, channelID string, updates map[string]any) error
	GetChannelSubscribers(ctx context.Context, channelID string) ([]string, error)
	GetUserChannels(ctx context.Context, userID string) ([]Channel, error)

	// Message broadcasting
	Broadcast(ctx context.Context, message *Message) error
	BroadcastToRoom(ctx context.Context, roomID string, message *Message) error
	BroadcastToChannel(ctx context.Context, channelID string, message *Message) error
	SendToUser(ctx context.Context, userID string, message *Message) error
	SendToConnection(ctx context.Context, connID string, message *Message) error

	// Bulk Broadcasting
	BroadcastToUsers(ctx context.Context, userIDs []string, message *Message) error
	BroadcastToRooms(ctx context.Context, roomIDs []string, message *Message) error
	BroadcastExcept(ctx context.Context, message *Message, excludeUserIDs []string) error
	BulkJoinRoom(ctx context.Context, connIDs []string, roomID string) error

	// Presence operations
	SetPresence(ctx context.Context, userID, status string) error
	GetPresence(ctx context.Context, userID string) (*UserPresence, error)
	GetOnlineUsers(ctx context.Context, roomID string) ([]string, error)
	TrackActivity(ctx context.Context, userID string) error

	// Presence Extended
	GetPresenceForUsers(ctx context.Context, userIDs []string) ([]*UserPresence, error)
	SetCustomStatus(ctx context.Context, userID, customStatus string) error
	GetOnlineCount(ctx context.Context) (int, error)
	GetPresenceInRooms(ctx context.Context, roomIDs []string) (map[string][]string, error)

	// Typing operations
	StartTyping(ctx context.Context, userID, roomID string) error
	StopTyping(ctx context.Context, userID, roomID string) error
	GetTypingUsers(ctx context.Context, roomID string) ([]string, error)

	// Typing Extended
	GetTypingUsersInChannels(ctx context.Context, channelIDs []string) (map[string][]string, error)
	IsTyping(ctx context.Context, userID, roomID string) (bool, error)
	ClearTyping(ctx context.Context, userID string) error

	// Message history
	SaveMessage(ctx context.Context, message *Message) error
	GetHistory(ctx context.Context, roomID string, query HistoryQuery) ([]*Message, error)

	// Message History Extended
	GetThreadHistory(ctx context.Context, roomID, threadID string, query HistoryQuery) ([]*Message, error)
	GetUserMessages(ctx context.Context, userID string, query HistoryQuery) ([]*Message, error)
	SearchMessages(ctx context.Context, roomID, searchTerm string, query HistoryQuery) ([]*Message, error)
	DeleteMessage(ctx context.Context, messageID string) error
	EditMessage(ctx context.Context, messageID string, newContent any) error

	// Reactions/Interactions
	AddReaction(ctx context.Context, messageID, userID, emoji string) error
	RemoveReaction(ctx context.Context, messageID, userID, emoji string) error
	GetReactions(ctx context.Context, messageID string) (map[string][]string, error)

	// Moderation
	MuteUser(ctx context.Context, userID, roomID string, duration time.Duration) error
	UnmuteUser(ctx context.Context, userID, roomID string) error
	BanUser(ctx context.Context, userID, roomID string, reason string, until *time.Time) error
	UnbanUser(ctx context.Context, userID, roomID string) error
	GetModerationLog(ctx context.Context, roomID string, limit int) ([]ModerationEvent, error)

	// Rate Limiting
	CheckRateLimit(ctx context.Context, userID string, action string) (bool, error)
	GetRateLimitStatus(ctx context.Context, userID string) (*RateLimitStatus, error)

	// Statistics & Monitoring
	GetStats(ctx context.Context) (*ManagerStats, error)
	GetRoomStats(ctx context.Context, roomID string) (*RoomStats, error)
	GetUserStats(ctx context.Context, userID string) (*UserStats, error)
	GetActiveRooms(ctx context.Context, since time.Duration) ([]Room, error)

	// Direct Messaging
	CreateDirectMessage(ctx context.Context, fromUserID, toUserID string) (string, error)
	GetDirectMessages(ctx context.Context, userID string) ([]Room, error)
	IsDirectMessage(ctx context.Context, roomID string) (bool, error)

	// Lifecycle
	Start(ctx context.Context) error
	Stop(ctx context.Context) error
	Health(ctx context.Context) error
}

Manager is the central interface for managing streaming connections, rooms, channels, presence, and message broadcasting.

type ManagerStats

type ManagerStats struct {
	TotalConnections int           `json:"total_connections"`
	TotalRooms       int           `json:"total_rooms"`
	TotalChannels    int           `json:"total_channels"`
	TotalMessages    int64         `json:"total_messages"`
	OnlineUsers      int           `json:"online_users"`
	MessagesPerSec   float64       `json:"messages_per_sec"`
	Uptime           time.Duration `json:"uptime"`
	MemoryUsage      int64         `json:"memory_usage,omitempty"`
}

ManagerStats provides overall streaming statistics.

type Member

type Member interface {
	GetUserID() string
	GetRole() string
	GetJoinedAt() time.Time
	GetPermissions() []string

	SetRole(role string)
	HasPermission(permission string) bool
	GrantPermission(permission string)
	RevokePermission(permission string)
	GetMetadata() map[string]any
	SetMetadata(key string, value any)
}

Member represents a room member with role and permissions.

type MemberOptions

type MemberOptions struct {
	UserID      string         `json:"user_id"`
	Role        string         `json:"role"`
	Permissions []string       `json:"permissions,omitempty"`
	Metadata    map[string]any `json:"metadata,omitempty"`
}

MemberOptions contains configuration for adding a member.

type Message

type Message struct {
	ID        string         `json:"id"`
	Type      string         `json:"type"` // "message", "presence", "typing", "system", "join", "leave", "error"
	Event     string         `json:"event,omitempty"`
	RoomID    string         `json:"room_id,omitempty"`
	ChannelID string         `json:"channel_id,omitempty"`
	UserID    string         `json:"user_id"`
	Data      any            `json:"data"`
	Metadata  map[string]any `json:"metadata,omitempty"`
	Timestamp time.Time      `json:"timestamp"`
	ThreadID  string         `json:"thread_id,omitempty"` // For threaded conversations
}

Message represents a streaming message with rich metadata.

type MessageEdit

type MessageEdit struct {
	MessageID   string    `json:"message_id"`
	NewContent  any       `json:"new_content"`
	EditedBy    string    `json:"edited_by"`
	EditedAt    time.Time `json:"edited_at"`
	PrevContent any       `json:"prev_content,omitempty"`
}

MessageEdit represents an edit to a message.

type MessageError

type MessageError struct {
	MessageID string
	Op        string
	Err       error
}

MessageError wraps message-related errors with context.

func (*MessageError) Error

func (e *MessageError) Error() string

func (*MessageError) Unwrap

func (e *MessageError) Unwrap() error

type MessageFilter

type MessageFilter struct {
	// Time range
	StartTime time.Time
	EndTime   time.Time

	// User filter
	UserIDs []string

	// Message types
	Types []string

	// Thread filter
	ThreadID string

	// Search query
	SearchTerm string

	// Metadata filters
	Metadata map[string]any
}

MessageFilter provides advanced filtering for message retrieval.

type MessageHandler

type MessageHandler func(ctx context.Context, message *Message) error

MessageHandler processes messages from distributed channels.

type MessageReaction

type MessageReaction struct {
	MessageID string    `json:"message_id"`
	UserID    string    `json:"user_id"`
	Emoji     string    `json:"emoji"`
	Timestamp time.Time `json:"timestamp"`
}

MessageReaction represents a reaction to a message.

type MessageSearchQuery

type MessageSearchQuery struct {
	Query   string         `json:"query"`
	RoomID  string         `json:"room_id,omitempty"`
	UserID  string         `json:"user_id,omitempty"`
	Before  *time.Time     `json:"before,omitempty"`
	After   *time.Time     `json:"after,omitempty"`
	Limit   int            `json:"limit,omitempty"`
	Offset  int            `json:"offset,omitempty"`
	Filters map[string]any `json:"filters,omitempty"`
}

MessageSearchQuery represents a query for searching messages.

type MessageStats

type MessageStats struct {
	TotalMessages  int64            `json:"total_messages"`
	MessagesByType map[string]int64 `json:"messages_by_type"`
	MessagesByUser map[string]int64 `json:"messages_by_user"`
	MessagesPerDay map[string]int64 `json:"messages_per_day"`
	AveragePerDay  float64          `json:"average_per_day"`
	OldestMessage  time.Time        `json:"oldest_message"`
	NewestMessage  time.Time        `json:"newest_message"`
	StorageSize    int64            `json:"storage_size"` // Bytes
	CompressedSize int64            `json:"compressed_size,omitempty"`
	Extra          map[string]any   `json:"extra,omitempty"`
}

MessageStats provides statistics about messages.

type MessageStore

type MessageStore interface {
	// Save and retrieve messages
	Save(ctx context.Context, message *Message) error
	SaveBatch(ctx context.Context, messages []*Message) error
	Get(ctx context.Context, messageID string) (*Message, error)
	Delete(ctx context.Context, messageID string) error

	// History retrieval
	GetHistory(ctx context.Context, roomID string, query HistoryQuery) ([]*Message, error)
	GetThreadHistory(ctx context.Context, roomID, threadID string, query HistoryQuery) ([]*Message, error)
	GetUserMessages(ctx context.Context, userID string, query HistoryQuery) ([]*Message, error)

	// Search
	Search(ctx context.Context, roomID, searchTerm string, query HistoryQuery) ([]*Message, error)

	// Statistics
	GetMessageCount(ctx context.Context, roomID string) (int64, error)
	GetMessageCountByUser(ctx context.Context, roomID, userID string) (int64, error)

	// Cleanup
	DeleteOld(ctx context.Context, olderThan time.Duration) error
	DeleteByRoom(ctx context.Context, roomID string) error
	DeleteByUser(ctx context.Context, userID string) error

	// Lifecycle
	Connect(ctx context.Context) error
	Disconnect(ctx context.Context) error
	Ping(ctx context.Context) error
}

MessageStore provides backend storage for message history and persistence.

type MessageStoreOptions

type MessageStoreOptions struct {
	// Enable message persistence
	Enabled bool

	// Retention period for messages
	RetentionPeriod time.Duration

	// Cleanup interval
	CleanupInterval time.Duration

	// Maximum messages per room
	MaxMessagesPerRoom int64

	// Index messages for search
	EnableSearch bool

	// Compress old messages
	CompressOld bool

	// Compress messages older than this duration
	CompressAfter time.Duration
}

MessageStoreOptions contains configuration for message persistence.

func DefaultMessageStoreOptions

func DefaultMessageStoreOptions() MessageStoreOptions

DefaultMessageStoreOptions returns default message store options.

type ModerationEvent

type ModerationEvent struct {
	ID          string         `json:"id"`
	Type        string         `json:"type"` // "ban", "unban", "mute", "kick", "warn", "slow_mode"
	RoomID      string         `json:"room_id"`
	TargetID    string         `json:"target_id,omitempty"`
	ModeratorID string         `json:"moderator_id"`
	Reason      string         `json:"reason,omitempty"`
	Timestamp   time.Time      `json:"timestamp"`
	Metadata    map[string]any `json:"metadata,omitempty"`
}

ModerationEvent represents a moderation action in a room.

type ModerationStatus

type ModerationStatus string

ModerationStatus represents the status of a moderation action.

const (
	ModerationStatusPending  ModerationStatus = "pending"
	ModerationStatusApproved ModerationStatus = "approved"
	ModerationStatusRejected ModerationStatus = "rejected"
	ModerationStatusActive   ModerationStatus = "active"
	ModerationStatusExpired  ModerationStatus = "expired"
)

type NodeChangeEvent

type NodeChangeEvent struct {
	Type      string    `json:"type"` // "joined", "left", "updated"
	Node      NodeInfo  `json:"node"`
	Timestamp time.Time `json:"timestamp"`
}

NodeChangeEvent represents a node joining or leaving.

type NodeChangeHandler

type NodeChangeHandler func(event NodeChangeEvent)

NodeChangeHandler is called when nodes join or leave the cluster.

type NodeInfo

type NodeInfo struct {
	ID          string         `json:"id"`
	Address     string         `json:"address"`
	Started     time.Time      `json:"started"`
	LastSeen    time.Time      `json:"last_seen"`
	Connections int            `json:"connections"`
	Rooms       int            `json:"rooms"`
	Channels    int            `json:"channels"`
	Version     string         `json:"version"`
	Metadata    map[string]any `json:"metadata,omitempty"`
}

NodeInfo contains information about a node in the cluster.

type OnlineStats

type OnlineStats struct {
	Current    int            `json:"current"`
	Peak24h    int            `json:"peak_24h"`
	Average24h float64        `json:"average_24h"`
	ByStatus   map[string]int `json:"by_status"`
	ByTimezone map[string]int `json:"by_timezone,omitempty"`
	Trend      string         `json:"trend"` // "increasing", "decreasing", "stable"
}

OnlineStats provides statistics about online users.

type Pagination

type Pagination struct {
	Limit  int
	Offset int
	Cursor string // For cursor-based pagination
}

Pagination handles result pagination.

type PresenceEvent

type PresenceEvent struct {
	Type      string         `json:"type"` // "online", "offline", "away", "busy"
	UserID    string         `json:"user_id"`
	Status    string         `json:"status"`
	Timestamp time.Time      `json:"timestamp"`
	Metadata  map[string]any `json:"metadata,omitempty"`
}

PresenceEvent represents a presence change event.

type PresenceFilters

type PresenceFilters struct {
	Status        []string  `json:"status,omitempty"`
	Online        bool      `json:"online,omitempty"`
	SinceActivity time.Time `json:"since_activity,omitempty"`
	RoomID        string    `json:"room_id,omitempty"`
	DeviceType    string    `json:"device_type,omitempty"`
}

PresenceFilters defines filters for querying presence data.

type PresenceOptions

type PresenceOptions struct {
	// Timeout after which user is considered offline
	OfflineTimeout time.Duration

	// Interval for cleanup of expired presence
	CleanupInterval time.Duration

	// Broadcast presence changes to rooms
	BroadcastChanges bool

	// Track activity automatically
	AutoTrackActivity bool
}

PresenceOptions contains configuration for presence tracking.

func DefaultPresenceOptions

func DefaultPresenceOptions() PresenceOptions

DefaultPresenceOptions returns default presence tracking options.

type PresenceStore

type PresenceStore interface {
	// Set and get presence
	Set(ctx context.Context, userID string, presence *UserPresence) error
	Get(ctx context.Context, userID string) (*UserPresence, error)
	GetMultiple(ctx context.Context, userIDs []string) ([]*UserPresence, error)
	Delete(ctx context.Context, userID string) error

	// Online users
	GetOnline(ctx context.Context) ([]string, error)
	SetOnline(ctx context.Context, userID string, ttl time.Duration) error
	SetOffline(ctx context.Context, userID string) error
	IsOnline(ctx context.Context, userID string) (bool, error)

	// Activity tracking
	UpdateActivity(ctx context.Context, userID string, timestamp time.Time) error
	GetLastActivity(ctx context.Context, userID string) (time.Time, error)

	// Cleanup
	CleanupExpired(ctx context.Context, olderThan time.Duration) error

	// Bulk operations
	SetMultiple(ctx context.Context, presences map[string]*UserPresence) error
	DeleteMultiple(ctx context.Context, userIDs []string) error

	// Advanced queries
	GetByStatus(ctx context.Context, status string) ([]*UserPresence, error)
	GetRecent(ctx context.Context, status string, since time.Duration) ([]*UserPresence, error)
	GetWithFilters(ctx context.Context, filters PresenceFilters) ([]*UserPresence, error)

	// History
	SaveHistory(ctx context.Context, userID string, event *PresenceEvent) error
	GetHistory(ctx context.Context, userID string, limit int) ([]*PresenceEvent, error)
	GetHistorySince(ctx context.Context, userID string, since time.Time) ([]*PresenceEvent, error)

	// Device tracking
	SetDevice(ctx context.Context, userID, deviceID string, device DeviceInfo) error
	GetDevices(ctx context.Context, userID string) ([]DeviceInfo, error)
	RemoveDevice(ctx context.Context, userID, deviceID string) error

	// Statistics
	CountByStatus(ctx context.Context) (map[string]int, error)
	GetActiveCount(ctx context.Context, since time.Duration) (int, error)

	// Lifecycle
	Connect(ctx context.Context) error
	Disconnect(ctx context.Context) error
	Ping(ctx context.Context) error
}

PresenceStore provides backend storage for presence data.

type PresenceTracker

type PresenceTracker interface {
	// Set and get presence
	SetPresence(ctx context.Context, userID, status string) error
	GetPresence(ctx context.Context, userID string) (*UserPresence, error)
	GetPresences(ctx context.Context, userIDs []string) ([]*UserPresence, error)

	// Activity tracking
	TrackActivity(ctx context.Context, userID string) error
	GetLastSeen(ctx context.Context, userID string) (time.Time, error)

	// Online users
	GetOnlineUsers(ctx context.Context) ([]string, error)
	GetOnlineUsersInRoom(ctx context.Context, roomID string) ([]string, error)
	IsOnline(ctx context.Context, userID string) (bool, error)

	// Custom status
	SetCustomStatus(ctx context.Context, userID, customStatus string) error
	GetCustomStatus(ctx context.Context, userID string) (string, error)

	// Broadcasting
	BroadcastPresence(ctx context.Context, roomID, userID, status string) error

	// Cleanup expired presence
	CleanupExpired(ctx context.Context) error

	// Bulk Operations
	SetPresenceForUsers(ctx context.Context, updates map[string]string) error
	GetPresenceForRooms(ctx context.Context, roomIDs []string) (map[string][]*UserPresence, error)
	GetPresenceBulk(ctx context.Context, userIDs []string) (map[string]*UserPresence, error)

	// Advanced Queries
	GetUsersByStatus(ctx context.Context, status string) ([]string, error)
	GetRecentlyOnline(ctx context.Context, since time.Duration) ([]string, error)
	GetRecentlyOffline(ctx context.Context, since time.Duration) ([]string, error)
	GetAwayUsers(ctx context.Context) ([]string, error)
	GetBusyUsers(ctx context.Context) ([]string, error)

	// Presence History
	GetPresenceHistory(ctx context.Context, userID string, since time.Time) ([]*PresenceEvent, error)
	GetStatusChanges(ctx context.Context, userID string, limit int) ([]*PresenceEvent, error)

	// Device Management
	AddDevice(ctx context.Context, userID, deviceID string, deviceInfo DeviceInfo) error
	RemoveDevice(ctx context.Context, userID, deviceID string) error
	GetDevices(ctx context.Context, userID string) ([]DeviceInfo, error)
	GetActiveDevices(ctx context.Context, userID string) ([]DeviceInfo, error)

	// Rich Presence
	SetRichPresence(ctx context.Context, userID string, richData map[string]any) error
	GetRichPresence(ctx context.Context, userID string) (map[string]any, error)
	SetActivity(ctx context.Context, userID string, activity *ActivityInfo) error
	GetActivity(ctx context.Context, userID string) (*ActivityInfo, error)

	// Availability
	SetAvailability(ctx context.Context, userID string, available bool, message string) error
	IsAvailable(ctx context.Context, userID string) (bool, string, error)

	// Time-based
	GetOnlineUsersAt(ctx context.Context, timestamp time.Time) ([]string, error)
	GetPresenceDuration(ctx context.Context, userID string, status string, since time.Time) (time.Duration, error)

	// Watching/Subscriptions
	WatchUser(ctx context.Context, watcherID, watchedUserID string) error
	UnwatchUser(ctx context.Context, watcherID, watchedUserID string) error
	GetWatchers(ctx context.Context, userID string) ([]string, error)
	GetWatching(ctx context.Context, userID string) ([]string, error)

	// Statistics
	GetOnlineStats(ctx context.Context) (*OnlineStats, error)
	GetPresenceStats(ctx context.Context, userID string) (*UserPresenceStats, error)

	// Lifecycle
	Start(ctx context.Context) error
	Stop(ctx context.Context) error
}

PresenceTracker manages user presence tracking (online/offline/away status).

type RateLimitStatus

type RateLimitStatus struct {
	Allowed    bool          `json:"allowed"`
	Remaining  int           `json:"remaining"`
	ResetAt    time.Time     `json:"reset_at"`
	RetryAfter time.Duration `json:"retry_after"`
}

RateLimitStatus represents rate limiting status for a user.

type Room

type Room interface {
	// Identity
	GetID() string
	GetName() string
	GetDescription() string
	GetOwner() string
	GetCreated() time.Time
	GetUpdated() time.Time
	GetMetadata() map[string]any

	// Membership
	Join(ctx context.Context, userID, role string) error
	Leave(ctx context.Context, userID string) error
	IsMember(ctx context.Context, userID string) (bool, error)
	GetMembers(ctx context.Context) ([]Member, error)
	GetMember(ctx context.Context, userID string) (Member, error)
	MemberCount(ctx context.Context) (int, error)

	// Advanced Membership Management
	GetMembersByRole(ctx context.Context, role string) ([]Member, error)
	UpdateMemberRole(ctx context.Context, userID, newRole string) error
	TransferOwnership(ctx context.Context, newOwnerID string) error
	BanMember(ctx context.Context, userID string, reason string, until *time.Time) error
	UnbanMember(ctx context.Context, userID string) error
	IsBanned(ctx context.Context, userID string) (bool, error)
	GetBannedMembers(ctx context.Context) ([]RoomBan, error)
	MuteMember(ctx context.Context, userID string, duration time.Duration) error
	UnmuteMember(ctx context.Context, userID string) error
	IsMuted(ctx context.Context, userID string) (bool, error)

	// Invitations
	CreateInvite(ctx context.Context, opts InviteOptions) (*Invite, error)
	RevokeInvite(ctx context.Context, inviteCode string) error
	ValidateInvite(ctx context.Context, inviteCode string) (bool, error)
	GetInvites(ctx context.Context) ([]*Invite, error)
	JoinWithInvite(ctx context.Context, userID, inviteCode string) error

	// Permissions
	HasPermission(ctx context.Context, userID, permission string) (bool, error)
	GrantPermission(ctx context.Context, userID, permission string) error
	RevokePermission(ctx context.Context, userID, permission string) error

	// Room Settings
	IsPrivate() bool
	SetPrivate(ctx context.Context, private bool) error
	GetMaxMembers() int
	SetMaxMembers(ctx context.Context, max int) error
	IsArchived() bool
	Archive(ctx context.Context) error
	Unarchive(ctx context.Context) error
	IsLocked() bool
	Lock(ctx context.Context, reason string) error
	Unlock(ctx context.Context) error

	// Moderation
	GetModerationLog(ctx context.Context, limit int) ([]ModerationEvent, error)
	SetSlowMode(ctx context.Context, intervalSeconds int) error
	GetSlowMode(ctx context.Context) int

	// Pinned Messages
	PinMessage(ctx context.Context, messageID string) error
	UnpinMessage(ctx context.Context, messageID string) error
	GetPinnedMessages(ctx context.Context) ([]string, error)

	// Categories/Tags
	AddTag(ctx context.Context, tag string) error
	RemoveTag(ctx context.Context, tag string) error
	GetTags() []string
	SetCategory(ctx context.Context, category string) error
	GetCategory() string

	// Statistics
	GetMessageCount(ctx context.Context) (int64, error)
	GetActiveMembers(ctx context.Context, since time.Duration) ([]Member, error)

	// Messaging
	Broadcast(ctx context.Context, message *Message) error
	BroadcastExcept(ctx context.Context, message *Message, excludeUserIDs []string) error
	BroadcastToRole(ctx context.Context, message *Message, role string) error

	// Read Receipts
	MarkAsRead(ctx context.Context, userID, messageID string) error
	GetUnreadCount(ctx context.Context, userID string, since time.Time) (int, error)
	GetLastReadMessage(ctx context.Context, userID string) (string, error)

	// Lifecycle
	Update(ctx context.Context, updates map[string]any) error
	Delete(ctx context.Context) error
}

Room represents a chat room or group for organizing conversations.

type RoomBan

type RoomBan struct {
	UserID    string         `json:"user_id"`
	RoomID    string         `json:"room_id"`
	Reason    string         `json:"reason"`
	BannedBy  string         `json:"banned_by"`
	BannedAt  time.Time      `json:"banned_at"`
	ExpiresAt *time.Time     `json:"expires_at,omitempty"`
	Metadata  map[string]any `json:"metadata,omitempty"`
}

RoomBan represents a banned user in a room.

type RoomError

type RoomError struct {
	RoomID string
	UserID string
	Op     string
	Err    error
}

RoomError wraps room-related errors with context.

func (*RoomError) Error

func (e *RoomError) Error() string

func (*RoomError) Unwrap

func (e *RoomError) Unwrap() error

type RoomEvent

type RoomEvent struct {
	Type      string         `json:"type"` // "created", "updated", "deleted", "member_joined", "member_left"
	RoomID    string         `json:"room_id"`
	UserID    string         `json:"user_id,omitempty"`
	Timestamp time.Time      `json:"timestamp"`
	Data      map[string]any `json:"data,omitempty"`
}

RoomEvent represents an event that occurs in a room.

type RoomOptions

type RoomOptions struct {
	ID          string         `json:"id,omitempty"`
	Name        string         `json:"name"`
	Description string         `json:"description,omitempty"`
	Owner       string         `json:"owner"`
	Private     bool           `json:"private"`
	MaxMembers  int            `json:"max_members,omitempty"`
	Metadata    map[string]any `json:"metadata,omitempty"`
}

RoomOptions contains configuration for creating a room.

type RoomStats

type RoomStats struct {
	TotalMessages   int64     `json:"total_messages"`
	TotalMembers    int       `json:"total_members"`
	ActiveMembers   int       `json:"active_members"`
	MessagesToday   int64     `json:"messages_today"`
	AverageMessages float64   `json:"average_messages"`
	PeakOnline      int       `json:"peak_online"`
	CreatedAt       time.Time `json:"created_at"`
	LastActivity    time.Time `json:"last_activity"`
}

RoomStats provides statistics for a specific room.

type RoomStore

type RoomStore interface {
	// CRUD operations
	Create(ctx context.Context, room Room) error
	Get(ctx context.Context, roomID string) (Room, error)
	Update(ctx context.Context, roomID string, updates map[string]any) error
	Delete(ctx context.Context, roomID string) error
	List(ctx context.Context, filters map[string]any) ([]Room, error)
	Exists(ctx context.Context, roomID string) (bool, error)

	// Bulk operations
	CreateMany(ctx context.Context, rooms []Room) error
	DeleteMany(ctx context.Context, roomIDs []string) error

	// Membership operations
	AddMember(ctx context.Context, roomID string, member Member) error
	RemoveMember(ctx context.Context, roomID, userID string) error
	GetMembers(ctx context.Context, roomID string) ([]Member, error)
	GetMember(ctx context.Context, roomID, userID string) (Member, error)
	IsMember(ctx context.Context, roomID, userID string) (bool, error)
	MemberCount(ctx context.Context, roomID string) (int, error)

	// User's rooms
	GetUserRooms(ctx context.Context, userID string) ([]Room, error)
	GetUserRoomsByRole(ctx context.Context, userID, role string) ([]Room, error)
	GetCommonRooms(ctx context.Context, userID1, userID2 string) ([]Room, error)

	// Advanced queries
	Search(ctx context.Context, query string, filters map[string]any) ([]Room, error)
	FindByTag(ctx context.Context, tag string) ([]Room, error)
	FindByCategory(ctx context.Context, category string) ([]Room, error)
	GetPublicRooms(ctx context.Context, limit int) ([]Room, error)
	GetArchivedRooms(ctx context.Context, userID string) ([]Room, error)

	// Statistics
	GetRoomCount(ctx context.Context) (int, error)
	GetTotalMembers(ctx context.Context) (int, error)

	// Bans and invites
	BanMember(ctx context.Context, roomID, userID string, ban RoomBan) error
	UnbanMember(ctx context.Context, roomID, userID string) error
	GetBans(ctx context.Context, roomID string) ([]RoomBan, error)
	IsBanned(ctx context.Context, roomID, userID string) (bool, error)
	SaveInvite(ctx context.Context, roomID string, invite *Invite) error
	GetInvite(ctx context.Context, inviteCode string) (*Invite, error)
	DeleteInvite(ctx context.Context, inviteCode string) error
	ListInvites(ctx context.Context, roomID string) ([]*Invite, error)

	// Lifecycle
	Connect(ctx context.Context) error
	Disconnect(ctx context.Context) error
	Ping(ctx context.Context) error
}

RoomStore provides backend storage for rooms.

type Subscription

type Subscription interface {
	GetConnID() string
	GetUserID() string
	GetSubscribedAt() time.Time
	GetFilters() map[string]any

	SetFilters(filters map[string]any)
	MatchesFilter(message *Message) bool
}

Subscription represents a connection's subscription to a channel.

type SubscriptionOptions

type SubscriptionOptions struct {
	ConnID  string         `json:"conn_id"`
	UserID  string         `json:"user_id"`
	Filters map[string]any `json:"filters,omitempty"` // Message filters (e.g., event type, user ID)
}

SubscriptionOptions contains configuration for subscribing to a channel.

type TypingEvent

type TypingEvent struct {
	Type      string    `json:"type"` // "started", "stopped"
	UserID    string    `json:"user_id"`
	RoomID    string    `json:"room_id"`
	Timestamp time.Time `json:"timestamp"`
}

TypingEvent represents a typing indicator event.

type TypingOptions

type TypingOptions struct {
	// Timeout after which typing indicator expires
	TypingTimeout time.Duration

	// Interval for cleanup of expired indicators
	CleanupInterval time.Duration

	// Broadcast typing events
	BroadcastEvents bool

	// Max typing users to track per room (prevents abuse)
	MaxTypingUsers int
}

TypingOptions contains configuration for typing indicators.

func DefaultTypingOptions

func DefaultTypingOptions() TypingOptions

DefaultTypingOptions returns default typing indicator options.

type TypingStore

type TypingStore interface {
	// Set and get typing status
	SetTyping(ctx context.Context, userID, roomID string, expiresAt time.Time) error
	RemoveTyping(ctx context.Context, userID, roomID string) error
	GetTypingUsers(ctx context.Context, roomID string) ([]string, error)
	IsTyping(ctx context.Context, userID, roomID string) (bool, error)

	// Cleanup
	CleanupExpired(ctx context.Context) error

	// Lifecycle
	Connect(ctx context.Context) error
	Disconnect(ctx context.Context) error
	Ping(ctx context.Context) error
}

TypingStore provides backend storage for typing indicators.

type TypingTracker

type TypingTracker interface {
	// Start and stop typing
	StartTyping(ctx context.Context, userID, roomID string) error
	StopTyping(ctx context.Context, userID, roomID string) error

	// Get typing users
	GetTypingUsers(ctx context.Context, roomID string) ([]string, error)
	IsTyping(ctx context.Context, userID, roomID string) (bool, error)

	// Broadcasting
	BroadcastTyping(ctx context.Context, roomID, userID string, isTyping bool) error

	// Cleanup expired typing indicators
	CleanupExpired(ctx context.Context) error

	// Lifecycle
	Start(ctx context.Context) error
	Stop(ctx context.Context) error
}

TypingTracker manages typing indicator tracking per room/channel.

type UserPresence

type UserPresence struct {
	UserID       string         `json:"user_id"`
	Status       string         `json:"status"` // "online", "away", "busy", "offline"
	LastSeen     time.Time      `json:"last_seen"`
	Connections  []string       `json:"connections"`
	CustomStatus string         `json:"custom_status,omitempty"`
	Metadata     map[string]any `json:"metadata,omitempty"`
}

UserPresence represents a user's online presence status.

type UserPresenceStats

type UserPresenceStats struct {
	TotalOnlineTime   time.Duration  `json:"total_online_time"`
	AverageOnlineTime time.Duration  `json:"average_online_time"`
	LastOnline        time.Time      `json:"last_online"`
	StatusChanges     int            `json:"status_changes"`
	MostCommonStatus  string         `json:"most_common_status"`
	DeviceCount       int            `json:"device_count"`
	Metadata          map[string]any `json:"metadata,omitempty"`
}

UserPresenceStats provides detailed statistics for a user's presence.

type UserStats

type UserStats struct {
	MessagesSent    int64         `json:"messages_sent"`
	RoomsJoined     int           `json:"rooms_joined"`
	OnlineTime      time.Duration `json:"online_time"`
	LastSeen        time.Time     `json:"last_seen"`
	AverageActivity float64       `json:"average_activity"`
}

UserStats provides statistics for a specific user.

type WebhookConfig

type WebhookConfig struct {
	ID         string            `json:"id"`
	URL        string            `json:"url"`
	Events     []string          `json:"events"`
	Secret     string            `json:"secret,omitempty"`
	Active     bool              `json:"active"`
	Headers    map[string]string `json:"headers,omitempty"`
	Timeout    time.Duration     `json:"timeout,omitempty"`
	RetryCount int               `json:"retry_count,omitempty"`
	CreatedAt  time.Time         `json:"created_at"`
	UpdatedAt  time.Time         `json:"updated_at"`
}

WebhookConfig represents webhook configuration.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL