coordinator

package
v0.1.4 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Oct 28, 2025 License: MIT Imports: 8 Imported by: 0

Documentation

Index

Constants

View Source
const (
	MessageTypeBroadcast      = "broadcast"
	MessageTypePresenceUpdate = "presence.update"
	MessageTypeRoomStateSync  = "room.state.sync"
	MessageTypeMemberJoin     = "room.member.join"
	MessageTypeMemberLeave    = "room.member.leave"
	MessageTypeNodeRegister   = "node.register"
	MessageTypeNodeUnregister = "node.unregister"
)

MessageType defines coordinator message types.

Variables

This section is empty.

Functions

This section is empty.

Types

type CoordinatorMessage

type CoordinatorMessage struct {
	Type      string
	NodeID    string
	UserID    string
	RoomID    string
	ChannelID string
	Payload   any
	Timestamp time.Time
}

CoordinatorMessage represents a message in the coordination system.

type MessageHandler

type MessageHandler func(ctx context.Context, msg *CoordinatorMessage) error

MessageHandler handles coordinator messages.

type PresenceSynchronizer

type PresenceSynchronizer struct {
	// contains filtered or unexported fields
}

PresenceSynchronizer synchronizes presence across nodes.

func NewPresenceSynchronizer

func NewPresenceSynchronizer(
	coordinator StreamCoordinator,
	store streaming.PresenceStore,
	interval time.Duration,
) *PresenceSynchronizer

NewPresenceSynchronizer creates a presence synchronizer.

func (*PresenceSynchronizer) HandlePresenceUpdate

func (ps *PresenceSynchronizer) HandlePresenceUpdate(ctx context.Context, presence *streaming.UserPresence) error

HandlePresenceUpdate handles incoming presence updates from coordinator.

func (*PresenceSynchronizer) OnPresenceChange

func (ps *PresenceSynchronizer) OnPresenceChange(ctx context.Context, event *streaming.PresenceEvent) error

OnPresenceChange broadcasts presence change to all nodes.

func (*PresenceSynchronizer) Start

func (ps *PresenceSynchronizer) Start(ctx context.Context) error

Start begins periodic presence sync.

func (*PresenceSynchronizer) Stop

func (ps *PresenceSynchronizer) Stop(ctx context.Context) error

Stop stops the synchronizer.

func (*PresenceSynchronizer) SyncUserPresence

func (ps *PresenceSynchronizer) SyncUserPresence(ctx context.Context, userID string) error

SyncUserPresence syncs single user.

type RoomState

type RoomState struct {
	RoomID    string
	Members   []string
	Settings  map[string]any
	UpdatedAt time.Time
	Version   int64
}

RoomState represents room state for synchronization.

type RoomStateSynchronizer

type RoomStateSynchronizer struct {
	// contains filtered or unexported fields
}

RoomStateSynchronizer synchronizes room state across nodes.

func NewRoomStateSynchronizer

func NewRoomStateSynchronizer(
	coordinator StreamCoordinator,
	store streaming.RoomStore,
) *RoomStateSynchronizer

NewRoomStateSynchronizer creates a room state synchronizer.

func (*RoomStateSynchronizer) HandleMemberJoin

func (rss *RoomStateSynchronizer) HandleMemberJoin(ctx context.Context, roomID, userID string) error

HandleMemberJoin broadcasts join event.

func (*RoomStateSynchronizer) HandleMemberLeave

func (rss *RoomStateSynchronizer) HandleMemberLeave(ctx context.Context, roomID, userID string) error

HandleMemberLeave broadcasts leave event.

func (*RoomStateSynchronizer) HandleRoomStateUpdate

func (rss *RoomStateSynchronizer) HandleRoomStateUpdate(ctx context.Context, state *RoomState) error

HandleRoomStateUpdate handles incoming room state updates.

func (*RoomStateSynchronizer) ResolveConflict

func (rss *RoomStateSynchronizer) ResolveConflict(ctx context.Context, local, remote *RoomState) (*RoomState, error)

ResolveConflict resolves state conflicts using last-write-wins.

func (*RoomStateSynchronizer) SyncRoomMembers

func (rss *RoomStateSynchronizer) SyncRoomMembers(ctx context.Context, roomID string) error

SyncRoomMembers syncs member list across nodes.

func (*RoomStateSynchronizer) SyncRoomSettings

func (rss *RoomStateSynchronizer) SyncRoomSettings(ctx context.Context, roomID string) error

SyncRoomSettings syncs room configuration.

type StreamCoordinator

type StreamCoordinator interface {
	// BroadcastToNode sends message to specific node
	BroadcastToNode(ctx context.Context, nodeID string, msg *streaming.Message) error

	// BroadcastToUser sends to user across all nodes
	BroadcastToUser(ctx context.Context, userID string, msg *streaming.Message) error

	// BroadcastToRoom sends to room across all nodes
	BroadcastToRoom(ctx context.Context, roomID string, msg *streaming.Message) error

	// BroadcastGlobal sends to all nodes
	BroadcastGlobal(ctx context.Context, msg *streaming.Message) error

	// SyncPresence synchronizes presence across nodes
	SyncPresence(ctx context.Context, presence *streaming.UserPresence) error

	// SyncRoomState synchronizes room state
	SyncRoomState(ctx context.Context, roomID string, state *RoomState) error

	// GetUserNodes returns nodes where user is connected
	GetUserNodes(ctx context.Context, userID string) ([]string, error)

	// GetRoomNodes returns nodes serving room
	GetRoomNodes(ctx context.Context, roomID string) ([]string, error)

	// RegisterNode registers this node
	RegisterNode(ctx context.Context, nodeID string, metadata map[string]any) error

	// UnregisterNode unregisters this node
	UnregisterNode(ctx context.Context, nodeID string) error

	// Subscribe subscribes to coordinator events
	Subscribe(ctx context.Context, handler MessageHandler) error

	// Start starts the coordinator
	Start(ctx context.Context) error

	// Stop stops the coordinator
	Stop(ctx context.Context) error
}

StreamCoordinator coordinates streaming across multiple nodes.

func NewNATSCoordinator

func NewNATSCoordinator(conn *nats.Conn, nodeID string) (StreamCoordinator, error)

NewNATSCoordinator creates a NATS-based coordinator.

func NewRedisCoordinator

func NewRedisCoordinator(client *redis.Client, nodeID string) StreamCoordinator

NewRedisCoordinator creates a Redis-based coordinator.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL