Documentation
¶
Index ¶
- Constants
- Variables
- func NewBackendError(backend, op string, err error) error
- func NewChannelError(channelID, op string, err error) error
- func NewConnectionError(connID, op string, err error) error
- func NewMessageError(messageID, op string, err error) error
- func NewRoomError(roomID, userID, op string, err error) error
- type ActivityInfo
- type AnalyticsEvent
- type AnalyticsQuery
- type AnalyticsResult
- type Availability
- type BackendError
- type Channel
- type ChannelError
- type ChannelOptions
- type ChannelPattern
- type ChannelStore
- type ClusterStats
- type Config
- type ConfigOption
- func WithAuthentication(username, password string) ConfigOption
- func WithBackend(backend string) ConfigOption
- func WithBackendURLs(urls ...string) ConfigOption
- func WithBufferSizes(read, write int) ConfigOption
- func WithConfig(config Config) ConfigOption
- func WithConnectionLimits(perUser, roomsPerUser, channelsPerUser int) ConfigOption
- func WithFeatures(rooms, channels, presence, typing, history bool) ConfigOption
- func WithLocalBackend() ConfigOption
- func WithMessageLimits(maxSize, maxPerSecond int) ConfigOption
- func WithMessageRetention(retention time.Duration) ConfigOption
- func WithNATSBackend(urls ...string) ConfigOption
- func WithNodeID(nodeID string) ConfigOption
- func WithPresenceTimeout(timeout time.Duration) ConfigOption
- func WithRedisBackend(url string) ConfigOption
- func WithRequireConfig(require bool) ConfigOption
- func WithTLS(certFile, keyFile, caFile string) ConfigOption
- func WithTimeouts(ping, pong, write time.Duration) ConfigOption
- func WithTypingTimeout(timeout time.Duration) ConfigOption
- type ConnectionError
- type ConnectionInfo
- type DeviceInfo
- type DistributedBackend
- type DistributedBackendOptions
- type EnhancedConnection
- type FileInfo
- type FileQuery
- type FileUpload
- type HistoryQuery
- type Invite
- type InviteOptions
- type Lock
- type Manager
- type ManagerStats
- type Member
- type MemberOptions
- type Message
- type MessageEdit
- type MessageError
- type MessageFilter
- type MessageHandler
- type MessageReaction
- type MessageSearchQuery
- type MessageStats
- type MessageStore
- type MessageStoreOptions
- type ModerationEvent
- type ModerationStatus
- type NodeChangeEvent
- type NodeChangeHandler
- type NodeInfo
- type OnlineStats
- type Pagination
- type PresenceEvent
- type PresenceFilters
- type PresenceOptions
- type PresenceStore
- type PresenceTracker
- type RateLimitStatus
- type Room
- type RoomBan
- type RoomError
- type RoomEvent
- type RoomOptions
- type RoomStats
- type RoomStore
- type Subscription
- type SubscriptionOptions
- type TypingEvent
- type TypingOptions
- type TypingStore
- type TypingTracker
- type UserPresence
- type UserPresenceStats
- type UserStats
- type WebhookConfig
Constants ¶
const ( // PatternExact matches exact channel name PatternExact = "exact" // PatternWildcard allows * wildcards (e.g., "user.*.events") PatternWildcard = "wildcard" // PatternPrefix matches channel name prefix (e.g., "system.") PatternPrefix = "prefix" // PatternSuffix matches channel name suffix (e.g., ".events") PatternSuffix = "suffix" )
Common channel patterns
const ( NodeEventJoined = "joined" NodeEventLeft = "left" NodeEventUpdated = "updated" )
Node change event types
const ( ActivityTypePlaying = "playing" ActivityTypeListening = "listening" ActivityTypeWatching = "watching" ActivityTypeStreaming = "streaming" ActivityTypeCustom = "custom" )
Activity types
const ( RoleOwner = "owner" RoleAdmin = "admin" RoleMember = "member" RoleGuest = "guest" )
Common room roles
const ( PermissionSendMessage = "send_message" PermissionDeleteMessage = "delete_message" PermissionInviteMembers = "invite_members" PermissionRemoveMembers = "remove_members" PermissionManageRoom = "manage_room" PermissionManageRoles = "manage_roles" )
Common permissions
const ( RoomEventCreated = "created" RoomEventUpdated = "updated" RoomEventDeleted = "deleted" RoomEventMemberJoined = "member_joined" RoomEventMemberLeft = "member_left" RoomEventMemberKicked = "member_kicked" RoomEventMemberBanned = "member_banned" )
Room event types
const ( ModerationEventBan = "ban" ModerationEventUnban = "unban" ModerationEventMute = "mute" ModerationEventUnmute = "unmute" ModerationEventKick = "kick" ModerationEventWarn = "warn" ModerationEventSlowMode = "slow_mode" ModerationEventLock = "lock" ModerationEventUnlock = "unlock" )
Moderation event types
const ( MessageTypeMessage = "message" MessageTypePresence = "presence" MessageTypeTyping = "typing" MessageTypeSystem = "system" MessageTypeJoin = "join" MessageTypeLeave = "leave" MessageTypeError = "error" )
Message types
const ( StatusOnline = "online" StatusAway = "away" StatusBusy = "busy" StatusOffline = "offline" )
Status constants
const ( TypingEventStarted = "started" TypingEventStopped = "stopped" )
Typing event types
Variables ¶
var ( // Connection errors ErrConnectionNotFound = errors.New("connection not found") ErrConnectionClosed = errors.New("connection closed") ErrConnectionLimitReached = errors.New("connection limit reached") ErrInvalidConnection = errors.New("invalid connection") // Room errors ErrRoomNotFound = errors.New("room not found") ErrRoomAlreadyExists = errors.New("room already exists") ErrRoomFull = errors.New("room is full") ErrNotRoomMember = errors.New("not a member of the room") ErrAlreadyRoomMember = errors.New("already a member of the room") ErrInvalidRoom = errors.New("invalid room") ErrRoomLimitReached = errors.New("room limit reached") // Channel errors ErrChannelNotFound = errors.New("channel not found") ErrChannelAlreadyExists = errors.New("channel already exists") ErrNotSubscribed = errors.New("not subscribed to channel") ErrAlreadySubscribed = errors.New("already subscribed to channel") ErrInvalidChannel = errors.New("invalid channel") // Permission errors ErrPermissionDenied = errors.New("permission denied") ErrInvalidPermission = errors.New("invalid permission") ErrInsufficientRole = errors.New("insufficient role") // Message errors ErrMessageTooLarge = errors.New("message too large") ErrInvalidMessage = errors.New("invalid message") ErrMessageNotFound = errors.New("message not found") // Presence errors ErrPresenceNotFound = errors.New("presence not found") ErrInvalidStatus = errors.New("invalid status") // Invite errors ErrInviteNotFound = errors.New("invite not found") ErrInviteExpired = errors.New("invite expired") // Backend errors ErrBackendNotConnected = errors.New("backend not connected") ErrBackendTimeout = errors.New("backend operation timeout") // Configuration errors ErrInvalidConfig = errors.New("invalid configuration") ErrMissingConfig = errors.New("missing required configuration") // Distributed errors ErrNodeNotFound = errors.New("node not found") ErrLockAcquisitionFailed = errors.New("failed to acquire lock") ErrLockNotHeld = errors.New("lock not held") )
Common errors
Functions ¶
func NewBackendError ¶
func NewChannelError ¶
func NewConnectionError ¶
func NewMessageError ¶
func NewRoomError ¶
Types ¶
type ActivityInfo ¶
type ActivityInfo struct {
Type string `json:"type"` // "playing", "listening", "watching", "streaming", "custom"
Name string `json:"name"`
Details string `json:"details,omitempty"`
State string `json:"state,omitempty"`
StartTime time.Time `json:"start_time,omitempty"`
EndTime *time.Time `json:"end_time,omitempty"`
Assets map[string]any `json:"assets,omitempty"` // Images, icons, etc.
Metadata map[string]any `json:"metadata,omitempty"`
}
ActivityInfo represents rich activity/presence information (Discord-style).
type AnalyticsEvent ¶
type AnalyticsEvent struct {
Type string `json:"type"`
UserID string `json:"user_id,omitempty"`
RoomID string `json:"room_id,omitempty"`
Data map[string]any `json:"data,omitempty"`
Timestamp time.Time `json:"timestamp"`
}
AnalyticsEvent represents an event for analytics tracking.
type AnalyticsQuery ¶
type AnalyticsQuery struct {
Metric string `json:"metric"`
StartTime time.Time `json:"start_time"`
EndTime time.Time `json:"end_time"`
GroupBy string `json:"group_by,omitempty"`
Filters map[string]any `json:"filters,omitempty"`
Limit int `json:"limit,omitempty"`
}
AnalyticsQuery represents a query for analytics data.
type AnalyticsResult ¶
type AnalyticsResult struct {
Metric string `json:"metric"`
Value float64 `json:"value"`
Timestamp time.Time `json:"timestamp"`
Labels map[string]string `json:"labels,omitempty"`
Metadata map[string]any `json:"metadata,omitempty"`
}
AnalyticsResult represents the result of an analytics query.
type Availability ¶
type Availability struct {
Available bool `json:"available"`
Message string `json:"message,omitempty"`
UpdatedAt time.Time `json:"updated_at"`
}
Availability represents user availability status.
type BackendError ¶
BackendError wraps backend-related errors with context.
func (*BackendError) Error ¶
func (e *BackendError) Error() string
func (*BackendError) Unwrap ¶
func (e *BackendError) Unwrap() error
type Channel ¶
type Channel interface {
// Identity
GetID() string
GetName() string
GetCreated() time.Time
GetMessageCount() int64
// Subscription management
Subscribe(ctx context.Context, sub Subscription) error
Unsubscribe(ctx context.Context, connID string) error
GetSubscribers(ctx context.Context) ([]Subscription, error)
GetSubscriberCount(ctx context.Context) (int, error)
IsSubscribed(ctx context.Context, connID string) (bool, error)
// Publishing
Publish(ctx context.Context, message *Message) error
// Lifecycle
Delete(ctx context.Context) error
}
Channel represents a pub/sub topic for message broadcasting.
type ChannelError ¶
ChannelError wraps channel-related errors with context.
func (*ChannelError) Error ¶
func (e *ChannelError) Error() string
func (*ChannelError) Unwrap ¶
func (e *ChannelError) Unwrap() error
type ChannelOptions ¶
type ChannelOptions struct {
ID string `json:"id,omitempty"`
Name string `json:"name"`
Description string `json:"description,omitempty"`
Private bool `json:"private"`
Persistent bool `json:"persistent"` // Persist messages for late subscribers
Metadata map[string]any `json:"metadata,omitempty"`
}
ChannelOptions contains configuration for creating a channel.
type ChannelPattern ¶
type ChannelPattern interface {
// Match checks if a channel name matches the pattern
Match(channelName string) bool
// GetPattern returns the pattern string
GetPattern() string
}
ChannelPattern represents pattern-matching for channel subscriptions.
type ChannelStore ¶
type ChannelStore interface {
// CRUD operations
Create(ctx context.Context, channel Channel) error
Get(ctx context.Context, channelID string) (Channel, error)
Delete(ctx context.Context, channelID string) error
List(ctx context.Context) ([]Channel, error)
Exists(ctx context.Context, channelID string) (bool, error)
// Subscription operations
AddSubscription(ctx context.Context, channelID string, sub Subscription) error
RemoveSubscription(ctx context.Context, channelID, connID string) error
GetSubscriptions(ctx context.Context, channelID string) ([]Subscription, error)
GetSubscriberCount(ctx context.Context, channelID string) (int, error)
IsSubscribed(ctx context.Context, channelID, connID string) (bool, error)
// Publishing (for distributed backends)
Publish(ctx context.Context, channelID string, message *Message) error
// User's channels
GetUserChannels(ctx context.Context, userID string) ([]Channel, error)
// Lifecycle
Connect(ctx context.Context) error
Disconnect(ctx context.Context) error
Ping(ctx context.Context) error
}
ChannelStore provides backend storage for channels.
type ClusterStats ¶
type ClusterStats struct {
TotalNodes int `json:"total_nodes"`
ActiveNodes int `json:"active_nodes"`
TotalConnections int `json:"total_connections"`
TotalRooms int `json:"total_rooms"`
TotalChannels int `json:"total_channels"`
MessagesPerSecond float64 `json:"messages_per_second"`
Nodes []NodeInfo `json:"nodes"`
Uptime time.Duration `json:"uptime"`
Extra map[string]any `json:"extra,omitempty"`
}
ClusterStats provides statistics about the streaming cluster.
type Config ¶
type Config struct {
// Backend configuration
Backend string `json:"backend" yaml:"backend"` // "local", "redis", "nats"
BackendURLs []string `json:"backend_urls" yaml:"backend_urls"` // Connection URLs
BackendUsername string `json:"backend_username" yaml:"backend_username"` // Authentication username
BackendPassword string `json:"backend_password" yaml:"backend_password"` // Authentication password
// Feature toggles
EnableRooms bool `json:"enable_rooms" yaml:"enable_rooms"`
EnableChannels bool `json:"enable_channels" yaml:"enable_channels"`
EnablePresence bool `json:"enable_presence" yaml:"enable_presence"`
EnableTypingIndicators bool `json:"enable_typing_indicators" yaml:"enable_typing_indicators"`
EnableMessageHistory bool `json:"enable_message_history" yaml:"enable_message_history"`
EnableDistributed bool `json:"enable_distributed" yaml:"enable_distributed"`
// Connection limits
MaxConnectionsPerUser int `json:"max_connections_per_user" yaml:"max_connections_per_user"`
MaxRoomsPerUser int `json:"max_rooms_per_user" yaml:"max_rooms_per_user"`
MaxChannelsPerUser int `json:"max_channels_per_user" yaml:"max_channels_per_user"`
MaxMessageSize int `json:"max_message_size" yaml:"max_message_size"` // Bytes
MaxMessagesPerSecond int `json:"max_messages_per_second" yaml:"max_messages_per_second"`
// Timeouts
PingInterval time.Duration `json:"ping_interval" yaml:"ping_interval"`
PongTimeout time.Duration `json:"pong_timeout" yaml:"pong_timeout"`
WriteTimeout time.Duration `json:"write_timeout" yaml:"write_timeout"`
ReadBufferSize int `json:"read_buffer_size" yaml:"read_buffer_size"`
WriteBufferSize int `json:"write_buffer_size" yaml:"write_buffer_size"`
// Message persistence
MessageRetention time.Duration `json:"message_retention" yaml:"message_retention"`
MessageCleanup time.Duration `json:"message_cleanup" yaml:"message_cleanup"`
MaxMessagesPerRoom int64 `json:"max_messages_per_room" yaml:"max_messages_per_room"`
// Presence settings
PresenceTimeout time.Duration `json:"presence_timeout" yaml:"presence_timeout"`
PresenceCleanup time.Duration `json:"presence_cleanup" yaml:"presence_cleanup"`
// Typing settings
TypingTimeout time.Duration `json:"typing_timeout" yaml:"typing_timeout"`
TypingCleanup time.Duration `json:"typing_cleanup" yaml:"typing_cleanup"`
MaxTypingUsersPerRoom int `json:"max_typing_users_per_room" yaml:"max_typing_users_per_room"`
// Distributed settings
NodeID string `json:"node_id" yaml:"node_id"`
HeartbeatInterval time.Duration `json:"heartbeat_interval" yaml:"heartbeat_interval"`
NodeTimeout time.Duration `json:"node_timeout" yaml:"node_timeout"`
// TLS
TLSEnabled bool `json:"tls_enabled" yaml:"tls_enabled"`
TLSCertFile string `json:"tls_cert_file" yaml:"tls_cert_file"`
TLSKeyFile string `json:"tls_key_file" yaml:"tls_key_file"`
TLSCAFile string `json:"tls_ca_file" yaml:"tls_ca_file"`
// Misc
RequireConfig bool `json:"-" yaml:"-"` // Internal flag for config loading
}
Config contains all configuration for the streaming extension.
type ConfigOption ¶
type ConfigOption func(*Config)
ConfigOption is a functional option for Config.
func WithAuthentication ¶
func WithAuthentication(username, password string) ConfigOption
WithAuthentication sets backend authentication credentials.
func WithBackendURLs ¶
func WithBackendURLs(urls ...string) ConfigOption
WithBackendURLs sets the backend connection URLs.
func WithBufferSizes ¶
func WithBufferSizes(read, write int) ConfigOption
WithBufferSizes sets buffer sizes.
func WithConfig ¶
func WithConfig(config Config) ConfigOption
WithConfig sets the complete configuration.
func WithConnectionLimits ¶
func WithConnectionLimits(perUser, roomsPerUser, channelsPerUser int) ConfigOption
WithConnectionLimits sets connection limits.
func WithFeatures ¶
func WithFeatures(rooms, channels, presence, typing, history bool) ConfigOption
WithFeatures enables/disables features.
func WithLocalBackend ¶
func WithLocalBackend() ConfigOption
WithLocalBackend configures local in-memory backend.
func WithMessageLimits ¶
func WithMessageLimits(maxSize, maxPerSecond int) ConfigOption
WithMessageLimits sets message limits.
func WithMessageRetention ¶
func WithMessageRetention(retention time.Duration) ConfigOption
WithMessageRetention sets message retention period.
func WithNATSBackend ¶
func WithNATSBackend(urls ...string) ConfigOption
WithNATSBackend configures NATS backend.
func WithNodeID ¶
func WithNodeID(nodeID string) ConfigOption
WithNodeID sets the node ID for distributed mode.
func WithPresenceTimeout ¶
func WithPresenceTimeout(timeout time.Duration) ConfigOption
WithPresenceTimeout sets presence timeout.
func WithRedisBackend ¶
func WithRedisBackend(url string) ConfigOption
WithRedisBackend configures Redis backend.
func WithRequireConfig ¶
func WithRequireConfig(require bool) ConfigOption
WithRequireConfig requires configuration from ConfigManager.
func WithTLS ¶
func WithTLS(certFile, keyFile, caFile string) ConfigOption
WithTLS enables TLS with certificate files.
func WithTimeouts ¶
func WithTimeouts(ping, pong, write time.Duration) ConfigOption
WithTimeouts sets connection timeouts.
func WithTypingTimeout ¶
func WithTypingTimeout(timeout time.Duration) ConfigOption
WithTypingTimeout sets typing indicator timeout.
type ConnectionError ¶
ConnectionError wraps connection-related errors with context.
func (*ConnectionError) Error ¶
func (e *ConnectionError) Error() string
func (*ConnectionError) Unwrap ¶
func (e *ConnectionError) Unwrap() error
type ConnectionInfo ¶
type ConnectionInfo struct {
ID string `json:"id"`
UserID string `json:"user_id"`
SessionID string `json:"session_id"`
ConnectedAt time.Time `json:"connected_at"`
LastActivity time.Time `json:"last_activity"`
RoomsJoined []string `json:"rooms_joined"`
Subscriptions []string `json:"subscriptions"`
IPAddress string `json:"ip_address,omitempty"`
UserAgent string `json:"user_agent,omitempty"`
Metadata map[string]any `json:"metadata,omitempty"`
}
ConnectionInfo provides detailed information about a connection.
type DeviceInfo ¶
type DeviceInfo struct {
DeviceID string `json:"device_id"`
Type string `json:"type"` // "mobile", "desktop", "tablet", "web"
OS string `json:"os,omitempty"`
Browser string `json:"browser,omitempty"`
LastSeen time.Time `json:"last_seen"`
IP string `json:"ip,omitempty"`
Active bool `json:"active"`
Metadata map[string]any `json:"metadata,omitempty"`
}
DeviceInfo represents information about a user's device.
type DistributedBackend ¶
type DistributedBackend interface {
// Node coordination
RegisterNode(ctx context.Context, nodeID string, metadata map[string]any) error
UnregisterNode(ctx context.Context, nodeID string) error
GetNodes(ctx context.Context) ([]NodeInfo, error)
GetNode(ctx context.Context, nodeID string) (*NodeInfo, error)
Heartbeat(ctx context.Context, nodeID string) error
// Message broadcasting across nodes
Publish(ctx context.Context, channel string, message *Message) error
Subscribe(ctx context.Context, channel string, handler MessageHandler) error
Unsubscribe(ctx context.Context, channel string) error
// Distributed presence
SetPresence(ctx context.Context, userID, status string, ttl time.Duration) error
GetPresence(ctx context.Context, userID string) (*UserPresence, error)
GetOnlineUsers(ctx context.Context) ([]string, error)
// Distributed locks (for exclusive operations)
AcquireLock(ctx context.Context, key string, ttl time.Duration) (Lock, error)
ReleaseLock(ctx context.Context, lock Lock) error
// Distributed counters
Increment(ctx context.Context, key string) (int64, error)
Decrement(ctx context.Context, key string) (int64, error)
GetCounter(ctx context.Context, key string) (int64, error)
// Node discovery
DiscoverNodes(ctx context.Context) ([]NodeInfo, error)
WatchNodes(ctx context.Context, handler NodeChangeHandler) error
// Health check
Ping(ctx context.Context) error
// Lifecycle
Connect(ctx context.Context) error
Disconnect(ctx context.Context) error
}
DistributedBackend provides cross-node coordination for distributed streaming.
type DistributedBackendOptions ¶
type DistributedBackendOptions struct {
// Backend type: "redis", "nats", ""
Type string
// Connection URLs
URLs []string
// Node ID (unique identifier for this instance)
NodeID string
// Heartbeat interval
HeartbeatInterval time.Duration
// Node timeout (consider dead after this)
NodeTimeout time.Duration
// Reconnect settings
MaxReconnectAttempts int
ReconnectBackoff time.Duration
// TLS configuration
TLSEnabled bool
TLSCertFile string
TLSKeyFile string
TLSCAFile string
// Authentication
Username string
Password string
// Additional options
Options map[string]any
}
DistributedBackendOptions contains configuration for distributed backend.
func DefaultDistributedBackendOptions ¶
func DefaultDistributedBackendOptions() DistributedBackendOptions
DefaultDistributedBackendOptions returns default distributed backend options.
type EnhancedConnection ¶
type EnhancedConnection interface {
forge.Connection
// Metadata
GetUserID() string
SetUserID(userID string)
GetSessionID() string
SetSessionID(sessionID string)
GetMetadata(key string) (any, bool)
SetMetadata(key string, value any)
// Tracking
GetJoinedRooms() []string
AddRoom(roomID string)
RemoveRoom(roomID string)
IsInRoom(roomID string) bool
GetSubscriptions() []string
AddSubscription(channelID string)
RemoveSubscription(channelID string)
IsSubscribed(channelID string) bool
GetLastActivity() time.Time
UpdateActivity()
// State
IsClosed() bool
MarkClosed()
}
EnhancedConnection wraps the core forge.Connection with streaming metadata.
type FileInfo ¶
type FileInfo struct {
ID string `json:"id"`
Filename string `json:"filename"`
ContentType string `json:"content_type"`
Size int64 `json:"size"`
UserID string `json:"user_id"`
RoomID string `json:"room_id,omitempty"`
URL string `json:"url"`
ThumbnailURL string `json:"thumbnail_url,omitempty"`
Metadata map[string]any `json:"metadata,omitempty"`
UploadedAt time.Time `json:"uploaded_at"`
ExpiresAt *time.Time `json:"expires_at,omitempty"`
}
FileInfo represents information about an uploaded file.
type FileQuery ¶
type FileQuery struct {
UserID string `json:"user_id,omitempty"`
RoomID string `json:"room_id,omitempty"`
ContentType string `json:"content_type,omitempty"`
Before *time.Time `json:"before,omitempty"`
After *time.Time `json:"after,omitempty"`
Limit int `json:"limit,omitempty"`
Offset int `json:"offset,omitempty"`
Filters map[string]any `json:"filters,omitempty"`
}
FileQuery represents a query for searching files.
type FileUpload ¶
type FileUpload struct {
ID string `json:"id"`
Filename string `json:"filename"`
ContentType string `json:"content_type"`
Size int64 `json:"size"`
UserID string `json:"user_id"`
RoomID string `json:"room_id,omitempty"`
Data []byte `json:"data,omitempty"`
URL string `json:"url,omitempty"`
Metadata map[string]any `json:"metadata,omitempty"`
UploadedAt time.Time `json:"uploaded_at"`
}
FileUpload represents a file upload request.
type HistoryQuery ¶
type HistoryQuery struct {
Limit int // Maximum number of messages
Before time.Time // Messages before this timestamp
After time.Time // Messages after this timestamp
ThreadID string // Filter by thread
UserID string // Filter by user
SearchTerm string // Full-text search
}
HistoryQuery defines parameters for retrieving message history.
type Invite ¶
type Invite struct {
Code string `json:"code"`
RoomID string `json:"room_id"`
CreatedBy string `json:"created_by"`
CreatedAt time.Time `json:"created_at"`
ExpiresAt *time.Time `json:"expires_at,omitempty"`
MaxUses int `json:"max_uses"`
UsedCount int `json:"used_count"`
Metadata map[string]any `json:"metadata,omitempty"`
}
Invite represents a room invitation.
type InviteOptions ¶
type InviteOptions struct {
Duration time.Duration `json:"duration,omitempty"`
MaxUses int `json:"max_uses,omitempty"`
Role string `json:"role,omitempty"`
Metadata map[string]any `json:"metadata,omitempty"`
}
InviteOptions contains configuration for creating a room invitation.
type Lock ¶
type Lock interface {
GetKey() string
GetOwner() string
GetExpiry() time.Time
Renew(ctx context.Context, ttl time.Duration) error
IsHeld() bool
}
Lock represents a distributed lock.
type Manager ¶
type Manager interface {
// Connection management
Register(conn EnhancedConnection) error
Unregister(connID string) error
GetConnection(connID string) (EnhancedConnection, error)
GetUserConnections(userID string) []EnhancedConnection
GetAllConnections() []EnhancedConnection
ConnectionCount() int
// Advanced Connection Management
GetConnectionsByStatus(status string) []EnhancedConnection
KickConnection(ctx context.Context, connID string, reason string) error
GetConnectionInfo(connID string) (*ConnectionInfo, error)
GetIdleConnections(idleFor time.Duration) []EnhancedConnection
CleanupIdleConnections(ctx context.Context, idleFor time.Duration) (int, error)
// Room operations
CreateRoom(ctx context.Context, room Room) error
GetRoom(ctx context.Context, roomID string) (Room, error)
DeleteRoom(ctx context.Context, roomID string) error
JoinRoom(ctx context.Context, connID, roomID string) error
LeaveRoom(ctx context.Context, connID, roomID string) error
GetRoomMembers(ctx context.Context, roomID string) ([]Member, error)
ListRooms(ctx context.Context) ([]Room, error)
// Room Management Extended
UpdateRoom(ctx context.Context, roomID string, updates map[string]any) error
SearchRooms(ctx context.Context, query string, filters map[string]any) ([]Room, error)
GetPublicRooms(ctx context.Context, limit int) ([]Room, error)
GetUserRoomCount(ctx context.Context, userID string) (int, error)
ArchiveRoom(ctx context.Context, roomID string) error
RestoreRoom(ctx context.Context, roomID string) error
TransferRoomOwnership(ctx context.Context, roomID, newOwnerID string) error
// Channel operations
CreateChannel(ctx context.Context, channel Channel) error
GetChannel(ctx context.Context, channelID string) (Channel, error)
DeleteChannel(ctx context.Context, channelID string) error
Subscribe(ctx context.Context, connID, channelID string, filters map[string]any) error
Unsubscribe(ctx context.Context, connID, channelID string) error
ListChannels(ctx context.Context) ([]Channel, error)
// Channel Management Extended
UpdateChannel(ctx context.Context, channelID string, updates map[string]any) error
GetChannelSubscribers(ctx context.Context, channelID string) ([]string, error)
GetUserChannels(ctx context.Context, userID string) ([]Channel, error)
// Message broadcasting
Broadcast(ctx context.Context, message *Message) error
BroadcastToRoom(ctx context.Context, roomID string, message *Message) error
BroadcastToChannel(ctx context.Context, channelID string, message *Message) error
SendToUser(ctx context.Context, userID string, message *Message) error
SendToConnection(ctx context.Context, connID string, message *Message) error
// Bulk Broadcasting
BroadcastToUsers(ctx context.Context, userIDs []string, message *Message) error
BroadcastToRooms(ctx context.Context, roomIDs []string, message *Message) error
BroadcastExcept(ctx context.Context, message *Message, excludeUserIDs []string) error
BulkJoinRoom(ctx context.Context, connIDs []string, roomID string) error
// Presence operations
SetPresence(ctx context.Context, userID, status string) error
GetPresence(ctx context.Context, userID string) (*UserPresence, error)
GetOnlineUsers(ctx context.Context, roomID string) ([]string, error)
TrackActivity(ctx context.Context, userID string) error
// Presence Extended
GetPresenceForUsers(ctx context.Context, userIDs []string) ([]*UserPresence, error)
SetCustomStatus(ctx context.Context, userID, customStatus string) error
GetOnlineCount(ctx context.Context) (int, error)
GetPresenceInRooms(ctx context.Context, roomIDs []string) (map[string][]string, error)
// Typing operations
StartTyping(ctx context.Context, userID, roomID string) error
StopTyping(ctx context.Context, userID, roomID string) error
GetTypingUsers(ctx context.Context, roomID string) ([]string, error)
// Typing Extended
GetTypingUsersInChannels(ctx context.Context, channelIDs []string) (map[string][]string, error)
IsTyping(ctx context.Context, userID, roomID string) (bool, error)
ClearTyping(ctx context.Context, userID string) error
// Message history
SaveMessage(ctx context.Context, message *Message) error
GetHistory(ctx context.Context, roomID string, query HistoryQuery) ([]*Message, error)
// Message History Extended
GetThreadHistory(ctx context.Context, roomID, threadID string, query HistoryQuery) ([]*Message, error)
GetUserMessages(ctx context.Context, userID string, query HistoryQuery) ([]*Message, error)
SearchMessages(ctx context.Context, roomID, searchTerm string, query HistoryQuery) ([]*Message, error)
DeleteMessage(ctx context.Context, messageID string) error
EditMessage(ctx context.Context, messageID string, newContent any) error
// Reactions/Interactions
AddReaction(ctx context.Context, messageID, userID, emoji string) error
RemoveReaction(ctx context.Context, messageID, userID, emoji string) error
GetReactions(ctx context.Context, messageID string) (map[string][]string, error)
// Moderation
MuteUser(ctx context.Context, userID, roomID string, duration time.Duration) error
UnmuteUser(ctx context.Context, userID, roomID string) error
BanUser(ctx context.Context, userID, roomID string, reason string, until *time.Time) error
UnbanUser(ctx context.Context, userID, roomID string) error
GetModerationLog(ctx context.Context, roomID string, limit int) ([]ModerationEvent, error)
// Rate Limiting
CheckRateLimit(ctx context.Context, userID string, action string) (bool, error)
GetRateLimitStatus(ctx context.Context, userID string) (*RateLimitStatus, error)
// Statistics & Monitoring
GetStats(ctx context.Context) (*ManagerStats, error)
GetRoomStats(ctx context.Context, roomID string) (*RoomStats, error)
GetUserStats(ctx context.Context, userID string) (*UserStats, error)
GetActiveRooms(ctx context.Context, since time.Duration) ([]Room, error)
// Direct Messaging
CreateDirectMessage(ctx context.Context, fromUserID, toUserID string) (string, error)
GetDirectMessages(ctx context.Context, userID string) ([]Room, error)
IsDirectMessage(ctx context.Context, roomID string) (bool, error)
// Lifecycle
Start(ctx context.Context) error
Stop(ctx context.Context) error
Health(ctx context.Context) error
}
Manager is the central interface for managing streaming connections, rooms, channels, presence, and message broadcasting.
type ManagerStats ¶
type ManagerStats struct {
TotalConnections int `json:"total_connections"`
TotalRooms int `json:"total_rooms"`
TotalChannels int `json:"total_channels"`
TotalMessages int64 `json:"total_messages"`
OnlineUsers int `json:"online_users"`
MessagesPerSec float64 `json:"messages_per_sec"`
Uptime time.Duration `json:"uptime"`
MemoryUsage int64 `json:"memory_usage,omitempty"`
}
ManagerStats provides overall streaming statistics.
type Member ¶
type Member interface {
GetUserID() string
GetRole() string
GetJoinedAt() time.Time
GetPermissions() []string
SetRole(role string)
HasPermission(permission string) bool
GrantPermission(permission string)
RevokePermission(permission string)
GetMetadata() map[string]any
SetMetadata(key string, value any)
}
Member represents a room member with role and permissions.
type MemberOptions ¶
type MemberOptions struct {
UserID string `json:"user_id"`
Role string `json:"role"`
Permissions []string `json:"permissions,omitempty"`
Metadata map[string]any `json:"metadata,omitempty"`
}
MemberOptions contains configuration for adding a member.
type Message ¶
type Message struct {
ID string `json:"id"`
Type string `json:"type"` // "message", "presence", "typing", "system", "join", "leave", "error"
Event string `json:"event,omitempty"`
RoomID string `json:"room_id,omitempty"`
ChannelID string `json:"channel_id,omitempty"`
UserID string `json:"user_id"`
Data any `json:"data"`
Metadata map[string]any `json:"metadata,omitempty"`
Timestamp time.Time `json:"timestamp"`
ThreadID string `json:"thread_id,omitempty"` // For threaded conversations
}
Message represents a streaming message with rich metadata.
type MessageEdit ¶
type MessageEdit struct {
MessageID string `json:"message_id"`
NewContent any `json:"new_content"`
EditedBy string `json:"edited_by"`
EditedAt time.Time `json:"edited_at"`
PrevContent any `json:"prev_content,omitempty"`
}
MessageEdit represents an edit to a message.
type MessageError ¶
MessageError wraps message-related errors with context.
func (*MessageError) Error ¶
func (e *MessageError) Error() string
func (*MessageError) Unwrap ¶
func (e *MessageError) Unwrap() error
type MessageFilter ¶
type MessageFilter struct {
// Time range
StartTime time.Time
EndTime time.Time
// User filter
UserIDs []string
// Message types
Types []string
// Thread filter
ThreadID string
// Search query
SearchTerm string
// Metadata filters
Metadata map[string]any
}
MessageFilter provides advanced filtering for message retrieval.
type MessageHandler ¶
MessageHandler processes messages from distributed channels.
type MessageReaction ¶
type MessageReaction struct {
MessageID string `json:"message_id"`
UserID string `json:"user_id"`
Emoji string `json:"emoji"`
Timestamp time.Time `json:"timestamp"`
}
MessageReaction represents a reaction to a message.
type MessageSearchQuery ¶
type MessageSearchQuery struct {
Query string `json:"query"`
RoomID string `json:"room_id,omitempty"`
UserID string `json:"user_id,omitempty"`
Before *time.Time `json:"before,omitempty"`
After *time.Time `json:"after,omitempty"`
Limit int `json:"limit,omitempty"`
Offset int `json:"offset,omitempty"`
Filters map[string]any `json:"filters,omitempty"`
}
MessageSearchQuery represents a query for searching messages.
type MessageStats ¶
type MessageStats struct {
TotalMessages int64 `json:"total_messages"`
MessagesByType map[string]int64 `json:"messages_by_type"`
MessagesByUser map[string]int64 `json:"messages_by_user"`
MessagesPerDay map[string]int64 `json:"messages_per_day"`
AveragePerDay float64 `json:"average_per_day"`
OldestMessage time.Time `json:"oldest_message"`
NewestMessage time.Time `json:"newest_message"`
StorageSize int64 `json:"storage_size"` // Bytes
CompressedSize int64 `json:"compressed_size,omitempty"`
Extra map[string]any `json:"extra,omitempty"`
}
MessageStats provides statistics about messages.
type MessageStore ¶
type MessageStore interface {
// Save and retrieve messages
Save(ctx context.Context, message *Message) error
SaveBatch(ctx context.Context, messages []*Message) error
Get(ctx context.Context, messageID string) (*Message, error)
Delete(ctx context.Context, messageID string) error
// History retrieval
GetHistory(ctx context.Context, roomID string, query HistoryQuery) ([]*Message, error)
GetThreadHistory(ctx context.Context, roomID, threadID string, query HistoryQuery) ([]*Message, error)
GetUserMessages(ctx context.Context, userID string, query HistoryQuery) ([]*Message, error)
// Search
Search(ctx context.Context, roomID, searchTerm string, query HistoryQuery) ([]*Message, error)
// Statistics
GetMessageCount(ctx context.Context, roomID string) (int64, error)
GetMessageCountByUser(ctx context.Context, roomID, userID string) (int64, error)
// Cleanup
DeleteOld(ctx context.Context, olderThan time.Duration) error
DeleteByRoom(ctx context.Context, roomID string) error
DeleteByUser(ctx context.Context, userID string) error
// Lifecycle
Connect(ctx context.Context) error
Disconnect(ctx context.Context) error
Ping(ctx context.Context) error
}
MessageStore provides backend storage for message history and persistence.
type MessageStoreOptions ¶
type MessageStoreOptions struct {
// Enable message persistence
Enabled bool
// Retention period for messages
RetentionPeriod time.Duration
// Cleanup interval
CleanupInterval time.Duration
// Maximum messages per room
MaxMessagesPerRoom int64
// Index messages for search
EnableSearch bool
// Compress old messages
CompressOld bool
// Compress messages older than this duration
CompressAfter time.Duration
}
MessageStoreOptions contains configuration for message persistence.
func DefaultMessageStoreOptions ¶
func DefaultMessageStoreOptions() MessageStoreOptions
DefaultMessageStoreOptions returns default message store options.
type ModerationEvent ¶
type ModerationEvent struct {
ID string `json:"id"`
Type string `json:"type"` // "ban", "unban", "mute", "kick", "warn", "slow_mode"
RoomID string `json:"room_id"`
TargetID string `json:"target_id,omitempty"`
ModeratorID string `json:"moderator_id"`
Reason string `json:"reason,omitempty"`
Timestamp time.Time `json:"timestamp"`
Metadata map[string]any `json:"metadata,omitempty"`
}
ModerationEvent represents a moderation action in a room.
type ModerationStatus ¶
type ModerationStatus string
ModerationStatus represents the status of a moderation action.
const ( ModerationStatusPending ModerationStatus = "pending" ModerationStatusApproved ModerationStatus = "approved" ModerationStatusRejected ModerationStatus = "rejected" ModerationStatusActive ModerationStatus = "active" ModerationStatusExpired ModerationStatus = "expired" )
type NodeChangeEvent ¶
type NodeChangeEvent struct {
Type string `json:"type"` // "joined", "left", "updated"
Node NodeInfo `json:"node"`
Timestamp time.Time `json:"timestamp"`
}
NodeChangeEvent represents a node joining or leaving.
type NodeChangeHandler ¶
type NodeChangeHandler func(event NodeChangeEvent)
NodeChangeHandler is called when nodes join or leave the cluster.
type NodeInfo ¶
type NodeInfo struct {
ID string `json:"id"`
Address string `json:"address"`
Started time.Time `json:"started"`
LastSeen time.Time `json:"last_seen"`
Connections int `json:"connections"`
Rooms int `json:"rooms"`
Channels int `json:"channels"`
Version string `json:"version"`
Metadata map[string]any `json:"metadata,omitempty"`
}
NodeInfo contains information about a node in the cluster.
type OnlineStats ¶
type OnlineStats struct {
Current int `json:"current"`
Peak24h int `json:"peak_24h"`
Average24h float64 `json:"average_24h"`
ByStatus map[string]int `json:"by_status"`
ByTimezone map[string]int `json:"by_timezone,omitempty"`
Trend string `json:"trend"` // "increasing", "decreasing", "stable"
}
OnlineStats provides statistics about online users.
type Pagination ¶
Pagination handles result pagination.
type PresenceEvent ¶
type PresenceEvent struct {
Type string `json:"type"` // "online", "offline", "away", "busy"
UserID string `json:"user_id"`
Status string `json:"status"`
Timestamp time.Time `json:"timestamp"`
Metadata map[string]any `json:"metadata,omitempty"`
}
PresenceEvent represents a presence change event.
type PresenceFilters ¶
type PresenceFilters struct {
Status []string `json:"status,omitempty"`
Online bool `json:"online,omitempty"`
SinceActivity time.Time `json:"since_activity,omitempty"`
RoomID string `json:"room_id,omitempty"`
DeviceType string `json:"device_type,omitempty"`
}
PresenceFilters defines filters for querying presence data.
type PresenceOptions ¶
type PresenceOptions struct {
// Timeout after which user is considered offline
OfflineTimeout time.Duration
// Interval for cleanup of expired presence
CleanupInterval time.Duration
// Broadcast presence changes to rooms
BroadcastChanges bool
// Track activity automatically
AutoTrackActivity bool
}
PresenceOptions contains configuration for presence tracking.
func DefaultPresenceOptions ¶
func DefaultPresenceOptions() PresenceOptions
DefaultPresenceOptions returns default presence tracking options.
type PresenceStore ¶
type PresenceStore interface {
// Set and get presence
Set(ctx context.Context, userID string, presence *UserPresence) error
Get(ctx context.Context, userID string) (*UserPresence, error)
GetMultiple(ctx context.Context, userIDs []string) ([]*UserPresence, error)
Delete(ctx context.Context, userID string) error
// Online users
GetOnline(ctx context.Context) ([]string, error)
SetOnline(ctx context.Context, userID string, ttl time.Duration) error
SetOffline(ctx context.Context, userID string) error
IsOnline(ctx context.Context, userID string) (bool, error)
// Activity tracking
UpdateActivity(ctx context.Context, userID string, timestamp time.Time) error
GetLastActivity(ctx context.Context, userID string) (time.Time, error)
// Cleanup
CleanupExpired(ctx context.Context, olderThan time.Duration) error
// Bulk operations
SetMultiple(ctx context.Context, presences map[string]*UserPresence) error
DeleteMultiple(ctx context.Context, userIDs []string) error
// Advanced queries
GetByStatus(ctx context.Context, status string) ([]*UserPresence, error)
GetRecent(ctx context.Context, status string, since time.Duration) ([]*UserPresence, error)
GetWithFilters(ctx context.Context, filters PresenceFilters) ([]*UserPresence, error)
// History
SaveHistory(ctx context.Context, userID string, event *PresenceEvent) error
GetHistory(ctx context.Context, userID string, limit int) ([]*PresenceEvent, error)
GetHistorySince(ctx context.Context, userID string, since time.Time) ([]*PresenceEvent, error)
// Device tracking
SetDevice(ctx context.Context, userID, deviceID string, device DeviceInfo) error
GetDevices(ctx context.Context, userID string) ([]DeviceInfo, error)
RemoveDevice(ctx context.Context, userID, deviceID string) error
// Statistics
CountByStatus(ctx context.Context) (map[string]int, error)
GetActiveCount(ctx context.Context, since time.Duration) (int, error)
// Lifecycle
Connect(ctx context.Context) error
Disconnect(ctx context.Context) error
Ping(ctx context.Context) error
}
PresenceStore provides backend storage for presence data.
type PresenceTracker ¶
type PresenceTracker interface {
// Set and get presence
SetPresence(ctx context.Context, userID, status string) error
GetPresence(ctx context.Context, userID string) (*UserPresence, error)
GetPresences(ctx context.Context, userIDs []string) ([]*UserPresence, error)
// Activity tracking
TrackActivity(ctx context.Context, userID string) error
GetLastSeen(ctx context.Context, userID string) (time.Time, error)
// Online users
GetOnlineUsers(ctx context.Context) ([]string, error)
GetOnlineUsersInRoom(ctx context.Context, roomID string) ([]string, error)
IsOnline(ctx context.Context, userID string) (bool, error)
// Custom status
SetCustomStatus(ctx context.Context, userID, customStatus string) error
GetCustomStatus(ctx context.Context, userID string) (string, error)
// Broadcasting
BroadcastPresence(ctx context.Context, roomID, userID, status string) error
// Cleanup expired presence
CleanupExpired(ctx context.Context) error
// Bulk Operations
SetPresenceForUsers(ctx context.Context, updates map[string]string) error
GetPresenceForRooms(ctx context.Context, roomIDs []string) (map[string][]*UserPresence, error)
GetPresenceBulk(ctx context.Context, userIDs []string) (map[string]*UserPresence, error)
// Advanced Queries
GetUsersByStatus(ctx context.Context, status string) ([]string, error)
GetRecentlyOnline(ctx context.Context, since time.Duration) ([]string, error)
GetRecentlyOffline(ctx context.Context, since time.Duration) ([]string, error)
GetAwayUsers(ctx context.Context) ([]string, error)
GetBusyUsers(ctx context.Context) ([]string, error)
// Presence History
GetPresenceHistory(ctx context.Context, userID string, since time.Time) ([]*PresenceEvent, error)
GetStatusChanges(ctx context.Context, userID string, limit int) ([]*PresenceEvent, error)
// Device Management
AddDevice(ctx context.Context, userID, deviceID string, deviceInfo DeviceInfo) error
RemoveDevice(ctx context.Context, userID, deviceID string) error
GetDevices(ctx context.Context, userID string) ([]DeviceInfo, error)
GetActiveDevices(ctx context.Context, userID string) ([]DeviceInfo, error)
// Rich Presence
SetRichPresence(ctx context.Context, userID string, richData map[string]any) error
GetRichPresence(ctx context.Context, userID string) (map[string]any, error)
SetActivity(ctx context.Context, userID string, activity *ActivityInfo) error
GetActivity(ctx context.Context, userID string) (*ActivityInfo, error)
// Availability
SetAvailability(ctx context.Context, userID string, available bool, message string) error
IsAvailable(ctx context.Context, userID string) (bool, string, error)
// Time-based
GetOnlineUsersAt(ctx context.Context, timestamp time.Time) ([]string, error)
GetPresenceDuration(ctx context.Context, userID string, status string, since time.Time) (time.Duration, error)
// Watching/Subscriptions
WatchUser(ctx context.Context, watcherID, watchedUserID string) error
UnwatchUser(ctx context.Context, watcherID, watchedUserID string) error
GetWatchers(ctx context.Context, userID string) ([]string, error)
GetWatching(ctx context.Context, userID string) ([]string, error)
// Statistics
GetOnlineStats(ctx context.Context) (*OnlineStats, error)
GetPresenceStats(ctx context.Context, userID string) (*UserPresenceStats, error)
// Lifecycle
Start(ctx context.Context) error
Stop(ctx context.Context) error
}
PresenceTracker manages user presence tracking (online/offline/away status).
type RateLimitStatus ¶
type RateLimitStatus struct {
Allowed bool `json:"allowed"`
Remaining int `json:"remaining"`
ResetAt time.Time `json:"reset_at"`
RetryAfter time.Duration `json:"retry_after"`
}
RateLimitStatus represents rate limiting status for a user.
type Room ¶
type Room interface {
// Identity
GetID() string
GetName() string
GetDescription() string
GetOwner() string
GetCreated() time.Time
GetUpdated() time.Time
GetMetadata() map[string]any
// Membership
Join(ctx context.Context, userID, role string) error
Leave(ctx context.Context, userID string) error
IsMember(ctx context.Context, userID string) (bool, error)
GetMembers(ctx context.Context) ([]Member, error)
GetMember(ctx context.Context, userID string) (Member, error)
MemberCount(ctx context.Context) (int, error)
// Advanced Membership Management
GetMembersByRole(ctx context.Context, role string) ([]Member, error)
UpdateMemberRole(ctx context.Context, userID, newRole string) error
TransferOwnership(ctx context.Context, newOwnerID string) error
BanMember(ctx context.Context, userID string, reason string, until *time.Time) error
UnbanMember(ctx context.Context, userID string) error
IsBanned(ctx context.Context, userID string) (bool, error)
GetBannedMembers(ctx context.Context) ([]RoomBan, error)
MuteMember(ctx context.Context, userID string, duration time.Duration) error
UnmuteMember(ctx context.Context, userID string) error
IsMuted(ctx context.Context, userID string) (bool, error)
// Invitations
CreateInvite(ctx context.Context, opts InviteOptions) (*Invite, error)
RevokeInvite(ctx context.Context, inviteCode string) error
ValidateInvite(ctx context.Context, inviteCode string) (bool, error)
GetInvites(ctx context.Context) ([]*Invite, error)
JoinWithInvite(ctx context.Context, userID, inviteCode string) error
// Permissions
HasPermission(ctx context.Context, userID, permission string) (bool, error)
GrantPermission(ctx context.Context, userID, permission string) error
RevokePermission(ctx context.Context, userID, permission string) error
// Room Settings
IsPrivate() bool
SetPrivate(ctx context.Context, private bool) error
GetMaxMembers() int
SetMaxMembers(ctx context.Context, max int) error
IsArchived() bool
Archive(ctx context.Context) error
Unarchive(ctx context.Context) error
IsLocked() bool
Lock(ctx context.Context, reason string) error
Unlock(ctx context.Context) error
// Moderation
GetModerationLog(ctx context.Context, limit int) ([]ModerationEvent, error)
SetSlowMode(ctx context.Context, intervalSeconds int) error
GetSlowMode(ctx context.Context) int
// Pinned Messages
PinMessage(ctx context.Context, messageID string) error
UnpinMessage(ctx context.Context, messageID string) error
GetPinnedMessages(ctx context.Context) ([]string, error)
// Categories/Tags
AddTag(ctx context.Context, tag string) error
RemoveTag(ctx context.Context, tag string) error
GetTags() []string
SetCategory(ctx context.Context, category string) error
GetCategory() string
// Statistics
GetMessageCount(ctx context.Context) (int64, error)
GetActiveMembers(ctx context.Context, since time.Duration) ([]Member, error)
// Messaging
Broadcast(ctx context.Context, message *Message) error
BroadcastExcept(ctx context.Context, message *Message, excludeUserIDs []string) error
BroadcastToRole(ctx context.Context, message *Message, role string) error
// Read Receipts
MarkAsRead(ctx context.Context, userID, messageID string) error
GetUnreadCount(ctx context.Context, userID string, since time.Time) (int, error)
GetLastReadMessage(ctx context.Context, userID string) (string, error)
// Lifecycle
Update(ctx context.Context, updates map[string]any) error
Delete(ctx context.Context) error
}
Room represents a chat room or group for organizing conversations.
type RoomBan ¶
type RoomBan struct {
UserID string `json:"user_id"`
RoomID string `json:"room_id"`
Reason string `json:"reason"`
BannedBy string `json:"banned_by"`
BannedAt time.Time `json:"banned_at"`
ExpiresAt *time.Time `json:"expires_at,omitempty"`
Metadata map[string]any `json:"metadata,omitempty"`
}
RoomBan represents a banned user in a room.
type RoomEvent ¶
type RoomEvent struct {
Type string `json:"type"` // "created", "updated", "deleted", "member_joined", "member_left"
RoomID string `json:"room_id"`
UserID string `json:"user_id,omitempty"`
Timestamp time.Time `json:"timestamp"`
Data map[string]any `json:"data,omitempty"`
}
RoomEvent represents an event that occurs in a room.
type RoomOptions ¶
type RoomOptions struct {
ID string `json:"id,omitempty"`
Name string `json:"name"`
Description string `json:"description,omitempty"`
Owner string `json:"owner"`
Private bool `json:"private"`
MaxMembers int `json:"max_members,omitempty"`
Metadata map[string]any `json:"metadata,omitempty"`
}
RoomOptions contains configuration for creating a room.
type RoomStats ¶
type RoomStats struct {
TotalMessages int64 `json:"total_messages"`
TotalMembers int `json:"total_members"`
ActiveMembers int `json:"active_members"`
MessagesToday int64 `json:"messages_today"`
AverageMessages float64 `json:"average_messages"`
PeakOnline int `json:"peak_online"`
CreatedAt time.Time `json:"created_at"`
LastActivity time.Time `json:"last_activity"`
}
RoomStats provides statistics for a specific room.
type RoomStore ¶
type RoomStore interface {
// CRUD operations
Create(ctx context.Context, room Room) error
Get(ctx context.Context, roomID string) (Room, error)
Update(ctx context.Context, roomID string, updates map[string]any) error
Delete(ctx context.Context, roomID string) error
List(ctx context.Context, filters map[string]any) ([]Room, error)
Exists(ctx context.Context, roomID string) (bool, error)
// Bulk operations
CreateMany(ctx context.Context, rooms []Room) error
DeleteMany(ctx context.Context, roomIDs []string) error
// Membership operations
AddMember(ctx context.Context, roomID string, member Member) error
RemoveMember(ctx context.Context, roomID, userID string) error
GetMembers(ctx context.Context, roomID string) ([]Member, error)
GetMember(ctx context.Context, roomID, userID string) (Member, error)
IsMember(ctx context.Context, roomID, userID string) (bool, error)
MemberCount(ctx context.Context, roomID string) (int, error)
// User's rooms
GetUserRooms(ctx context.Context, userID string) ([]Room, error)
GetUserRoomsByRole(ctx context.Context, userID, role string) ([]Room, error)
GetCommonRooms(ctx context.Context, userID1, userID2 string) ([]Room, error)
// Advanced queries
Search(ctx context.Context, query string, filters map[string]any) ([]Room, error)
FindByTag(ctx context.Context, tag string) ([]Room, error)
FindByCategory(ctx context.Context, category string) ([]Room, error)
GetPublicRooms(ctx context.Context, limit int) ([]Room, error)
GetArchivedRooms(ctx context.Context, userID string) ([]Room, error)
// Statistics
GetRoomCount(ctx context.Context) (int, error)
GetTotalMembers(ctx context.Context) (int, error)
// Bans and invites
BanMember(ctx context.Context, roomID, userID string, ban RoomBan) error
UnbanMember(ctx context.Context, roomID, userID string) error
GetBans(ctx context.Context, roomID string) ([]RoomBan, error)
IsBanned(ctx context.Context, roomID, userID string) (bool, error)
SaveInvite(ctx context.Context, roomID string, invite *Invite) error
GetInvite(ctx context.Context, inviteCode string) (*Invite, error)
DeleteInvite(ctx context.Context, inviteCode string) error
ListInvites(ctx context.Context, roomID string) ([]*Invite, error)
// Lifecycle
Connect(ctx context.Context) error
Disconnect(ctx context.Context) error
Ping(ctx context.Context) error
}
RoomStore provides backend storage for rooms.
type Subscription ¶
type Subscription interface {
GetConnID() string
GetUserID() string
GetSubscribedAt() time.Time
GetFilters() map[string]any
SetFilters(filters map[string]any)
MatchesFilter(message *Message) bool
}
Subscription represents a connection's subscription to a channel.
type SubscriptionOptions ¶
type SubscriptionOptions struct {
ConnID string `json:"conn_id"`
UserID string `json:"user_id"`
Filters map[string]any `json:"filters,omitempty"` // Message filters (e.g., event type, user ID)
}
SubscriptionOptions contains configuration for subscribing to a channel.
type TypingEvent ¶
type TypingEvent struct {
Type string `json:"type"` // "started", "stopped"
UserID string `json:"user_id"`
RoomID string `json:"room_id"`
Timestamp time.Time `json:"timestamp"`
}
TypingEvent represents a typing indicator event.
type TypingOptions ¶
type TypingOptions struct {
// Timeout after which typing indicator expires
TypingTimeout time.Duration
// Interval for cleanup of expired indicators
CleanupInterval time.Duration
// Broadcast typing events
BroadcastEvents bool
// Max typing users to track per room (prevents abuse)
MaxTypingUsers int
}
TypingOptions contains configuration for typing indicators.
func DefaultTypingOptions ¶
func DefaultTypingOptions() TypingOptions
DefaultTypingOptions returns default typing indicator options.
type TypingStore ¶
type TypingStore interface {
// Set and get typing status
SetTyping(ctx context.Context, userID, roomID string, expiresAt time.Time) error
RemoveTyping(ctx context.Context, userID, roomID string) error
GetTypingUsers(ctx context.Context, roomID string) ([]string, error)
IsTyping(ctx context.Context, userID, roomID string) (bool, error)
// Cleanup
CleanupExpired(ctx context.Context) error
// Lifecycle
Connect(ctx context.Context) error
Disconnect(ctx context.Context) error
Ping(ctx context.Context) error
}
TypingStore provides backend storage for typing indicators.
type TypingTracker ¶
type TypingTracker interface {
// Start and stop typing
StartTyping(ctx context.Context, userID, roomID string) error
StopTyping(ctx context.Context, userID, roomID string) error
// Get typing users
GetTypingUsers(ctx context.Context, roomID string) ([]string, error)
IsTyping(ctx context.Context, userID, roomID string) (bool, error)
// Broadcasting
BroadcastTyping(ctx context.Context, roomID, userID string, isTyping bool) error
// Cleanup expired typing indicators
CleanupExpired(ctx context.Context) error
// Lifecycle
Start(ctx context.Context) error
Stop(ctx context.Context) error
}
TypingTracker manages typing indicator tracking per room/channel.
type UserPresence ¶
type UserPresence struct {
UserID string `json:"user_id"`
Status string `json:"status"` // "online", "away", "busy", "offline"
LastSeen time.Time `json:"last_seen"`
Connections []string `json:"connections"`
CustomStatus string `json:"custom_status,omitempty"`
Metadata map[string]any `json:"metadata,omitempty"`
}
UserPresence represents a user's online presence status.
type UserPresenceStats ¶
type UserPresenceStats struct {
TotalOnlineTime time.Duration `json:"total_online_time"`
AverageOnlineTime time.Duration `json:"average_online_time"`
LastOnline time.Time `json:"last_online"`
StatusChanges int `json:"status_changes"`
MostCommonStatus string `json:"most_common_status"`
DeviceCount int `json:"device_count"`
Metadata map[string]any `json:"metadata,omitempty"`
}
UserPresenceStats provides detailed statistics for a user's presence.
type UserStats ¶
type UserStats struct {
MessagesSent int64 `json:"messages_sent"`
RoomsJoined int `json:"rooms_joined"`
OnlineTime time.Duration `json:"online_time"`
LastSeen time.Time `json:"last_seen"`
AverageActivity float64 `json:"average_activity"`
}
UserStats provides statistics for a specific user.
type WebhookConfig ¶
type WebhookConfig struct {
ID string `json:"id"`
URL string `json:"url"`
Events []string `json:"events"`
Secret string `json:"secret,omitempty"`
Active bool `json:"active"`
Headers map[string]string `json:"headers,omitempty"`
Timeout time.Duration `json:"timeout,omitempty"`
RetryCount int `json:"retry_count,omitempty"`
CreatedAt time.Time `json:"created_at"`
UpdatedAt time.Time `json:"updated_at"`
}
WebhookConfig represents webhook configuration.