Documentation
¶
Index ¶
- Constants
- type CoordinatorMessage
- type MessageHandler
- type PresenceSynchronizer
- func (ps *PresenceSynchronizer) HandlePresenceUpdate(ctx context.Context, presence *streaming.UserPresence) error
- func (ps *PresenceSynchronizer) OnPresenceChange(ctx context.Context, event *streaming.PresenceEvent) error
- func (ps *PresenceSynchronizer) Start(ctx context.Context) error
- func (ps *PresenceSynchronizer) Stop(ctx context.Context) error
- func (ps *PresenceSynchronizer) SyncUserPresence(ctx context.Context, userID string) error
- type RoomState
- type RoomStateSynchronizer
- func (rss *RoomStateSynchronizer) HandleMemberJoin(ctx context.Context, roomID, userID string) error
- func (rss *RoomStateSynchronizer) HandleMemberLeave(ctx context.Context, roomID, userID string) error
- func (rss *RoomStateSynchronizer) HandleRoomStateUpdate(ctx context.Context, state *RoomState) error
- func (rss *RoomStateSynchronizer) ResolveConflict(ctx context.Context, local, remote *RoomState) (*RoomState, error)
- func (rss *RoomStateSynchronizer) SyncRoomMembers(ctx context.Context, roomID string) error
- func (rss *RoomStateSynchronizer) SyncRoomSettings(ctx context.Context, roomID string) error
- type StreamCoordinator
Constants ¶
const ( MessageTypeBroadcast = "broadcast" MessageTypePresenceUpdate = "presence.update" MessageTypeRoomStateSync = "room.state.sync" MessageTypeMemberJoin = "room.member.join" MessageTypeMemberLeave = "room.member.leave" MessageTypeNodeRegister = "node.register" MessageTypeNodeUnregister = "node.unregister" )
MessageType defines coordinator message types.
Variables ¶
This section is empty.
Functions ¶
This section is empty.
Types ¶
type CoordinatorMessage ¶
type CoordinatorMessage struct {
Type string
NodeID string
UserID string
RoomID string
ChannelID string
Payload any
Timestamp time.Time
}
CoordinatorMessage represents a message in the coordination system.
type MessageHandler ¶
type MessageHandler func(ctx context.Context, msg *CoordinatorMessage) error
MessageHandler handles coordinator messages.
type PresenceSynchronizer ¶
type PresenceSynchronizer struct {
// contains filtered or unexported fields
}
PresenceSynchronizer synchronizes presence across nodes.
func NewPresenceSynchronizer ¶
func NewPresenceSynchronizer( coordinator StreamCoordinator, store streaming.PresenceStore, interval time.Duration, ) *PresenceSynchronizer
NewPresenceSynchronizer creates a presence synchronizer.
func (*PresenceSynchronizer) HandlePresenceUpdate ¶
func (ps *PresenceSynchronizer) HandlePresenceUpdate(ctx context.Context, presence *streaming.UserPresence) error
HandlePresenceUpdate handles incoming presence updates from coordinator.
func (*PresenceSynchronizer) OnPresenceChange ¶
func (ps *PresenceSynchronizer) OnPresenceChange(ctx context.Context, event *streaming.PresenceEvent) error
OnPresenceChange broadcasts presence change to all nodes.
func (*PresenceSynchronizer) Start ¶
func (ps *PresenceSynchronizer) Start(ctx context.Context) error
Start begins periodic presence sync.
func (*PresenceSynchronizer) Stop ¶
func (ps *PresenceSynchronizer) Stop(ctx context.Context) error
Stop stops the synchronizer.
func (*PresenceSynchronizer) SyncUserPresence ¶
func (ps *PresenceSynchronizer) SyncUserPresence(ctx context.Context, userID string) error
SyncUserPresence syncs single user.
type RoomState ¶
type RoomState struct {
RoomID string
Members []string
Settings map[string]any
UpdatedAt time.Time
Version int64
}
RoomState represents room state for synchronization.
type RoomStateSynchronizer ¶
type RoomStateSynchronizer struct {
// contains filtered or unexported fields
}
RoomStateSynchronizer synchronizes room state across nodes.
func NewRoomStateSynchronizer ¶
func NewRoomStateSynchronizer( coordinator StreamCoordinator, store streaming.RoomStore, ) *RoomStateSynchronizer
NewRoomStateSynchronizer creates a room state synchronizer.
func (*RoomStateSynchronizer) HandleMemberJoin ¶
func (rss *RoomStateSynchronizer) HandleMemberJoin(ctx context.Context, roomID, userID string) error
HandleMemberJoin broadcasts join event.
func (*RoomStateSynchronizer) HandleMemberLeave ¶
func (rss *RoomStateSynchronizer) HandleMemberLeave(ctx context.Context, roomID, userID string) error
HandleMemberLeave broadcasts leave event.
func (*RoomStateSynchronizer) HandleRoomStateUpdate ¶
func (rss *RoomStateSynchronizer) HandleRoomStateUpdate(ctx context.Context, state *RoomState) error
HandleRoomStateUpdate handles incoming room state updates.
func (*RoomStateSynchronizer) ResolveConflict ¶
func (rss *RoomStateSynchronizer) ResolveConflict(ctx context.Context, local, remote *RoomState) (*RoomState, error)
ResolveConflict resolves state conflicts using last-write-wins.
func (*RoomStateSynchronizer) SyncRoomMembers ¶
func (rss *RoomStateSynchronizer) SyncRoomMembers(ctx context.Context, roomID string) error
SyncRoomMembers syncs member list across nodes.
func (*RoomStateSynchronizer) SyncRoomSettings ¶
func (rss *RoomStateSynchronizer) SyncRoomSettings(ctx context.Context, roomID string) error
SyncRoomSettings syncs room configuration.
type StreamCoordinator ¶
type StreamCoordinator interface {
// BroadcastToNode sends message to specific node
BroadcastToNode(ctx context.Context, nodeID string, msg *streaming.Message) error
// BroadcastToUser sends to user across all nodes
BroadcastToUser(ctx context.Context, userID string, msg *streaming.Message) error
// BroadcastToRoom sends to room across all nodes
BroadcastToRoom(ctx context.Context, roomID string, msg *streaming.Message) error
// BroadcastGlobal sends to all nodes
BroadcastGlobal(ctx context.Context, msg *streaming.Message) error
// SyncPresence synchronizes presence across nodes
SyncPresence(ctx context.Context, presence *streaming.UserPresence) error
// SyncRoomState synchronizes room state
SyncRoomState(ctx context.Context, roomID string, state *RoomState) error
// GetUserNodes returns nodes where user is connected
GetUserNodes(ctx context.Context, userID string) ([]string, error)
// GetRoomNodes returns nodes serving room
GetRoomNodes(ctx context.Context, roomID string) ([]string, error)
// RegisterNode registers this node
RegisterNode(ctx context.Context, nodeID string, metadata map[string]any) error
// UnregisterNode unregisters this node
UnregisterNode(ctx context.Context, nodeID string) error
// Subscribe subscribes to coordinator events
Subscribe(ctx context.Context, handler MessageHandler) error
// Start starts the coordinator
Start(ctx context.Context) error
// Stop stops the coordinator
Stop(ctx context.Context) error
}
StreamCoordinator coordinates streaming across multiple nodes.
func NewNATSCoordinator ¶
func NewNATSCoordinator(conn *nats.Conn, nodeID string) (StreamCoordinator, error)
NewNATSCoordinator creates a NATS-based coordinator.
func NewRedisCoordinator ¶
func NewRedisCoordinator(client *redis.Client, nodeID string) StreamCoordinator
NewRedisCoordinator creates a Redis-based coordinator.