Documentation
¶
Overview ¶
Package cluster provides network utility functions for subnet matching
Package cluster provides cluster configuration and management utilities
Index ¶
- Constants
- func GenerateNodeID() (string, error)
- func GetAllLocalIPs() ([]string, error)
- func GetCurrentTime() time.Time
- func IsSameSubnet(ip1, ip2 string) bool
- func SaveAppConfig(workspace string, cfg *AppConfig) error
- func SaveConfig(configPath string, config *StaticConfig) error
- func SaveDynamicState(statePath string, state *DynamicState) error
- func SaveStaticConfig(configPath string, config *StaticConfig) error
- type ActionSchema
- type AppConfig
- type Cluster
- func (c *Cluster) Call(peerID, action string, payload map[string]interface{}) ([]byte, error)
- func (c *Cluster) CallWithContext(ctx context.Context, peerID, action string, payload map[string]interface{}) ([]byte, error)
- func (c *Cluster) CleanupTask(taskID string)
- func (c *Cluster) FindPeersByCapability(capability string) []*Node
- func (c *Cluster) GetActionsSchema() []interface{}
- func (c *Cluster) GetActionsSchemaJSON() ([]byte, error)
- func (c *Cluster) GetAddress() string
- func (c *Cluster) GetAllLocalIPs() []string
- func (c *Cluster) GetCapabilities() []string
- func (c *Cluster) GetCategory() string
- func (c *Cluster) GetContinuationStore() *ContinuationStore
- func (c *Cluster) GetLocalNetworkInterfaces() ([]rpc.LocalNetworkInterface, error)
- func (c *Cluster) GetLogger() *ClusterLogger
- func (c *Cluster) GetNodeID() string
- func (c *Cluster) GetOnlinePeers() []interface{}
- func (c *Cluster) GetPeer(peerID string) (interface{}, error)
- func (c *Cluster) GetPorts() (udpPort, rpcPort int)
- func (c *Cluster) GetRPCChannel() *channels.RPCChannel
- func (c *Cluster) GetRPCPort() int
- func (c *Cluster) GetRegistry() interface{}
- func (c *Cluster) GetRole() string
- func (c *Cluster) GetTags() []string
- func (c *Cluster) GetTask(taskID string) (*Task, error)
- func (c *Cluster) GetTaskResultStore() *TaskResultStore
- func (c *Cluster) GetTaskResultStorer() rpc.TaskResultStorer
- func (c *Cluster) GetWorkspace() string
- func (c *Cluster) HandleDiscoveredNode(nodeID, name string, addresses []string, rpcPort int, role, category string, ...)
- func (c *Cluster) HandleNodeOffline(nodeID, reason string)
- func (c *Cluster) HandleTaskCompleteForTest(taskID string)
- func (c *Cluster) IsRunning() bool
- func (c *Cluster) LogDebug(msg string, args ...interface{})
- func (c *Cluster) LogError(msg string, args ...interface{})
- func (c *Cluster) LogInfo(msg string, args ...interface{})
- func (c *Cluster) LogRPCDebug(msg string, args ...interface{})
- func (c *Cluster) LogRPCError(msg string, args ...interface{})
- func (c *Cluster) LogRPCInfo(msg string, args ...interface{})
- func (c *Cluster) RegisterBasicHandlers() error
- func (c *Cluster) RegisterRPCHandler(action string, ...) error
- func (c *Cluster) SetMessageBus(bus *bus.MessageBus)
- func (c *Cluster) SetPorts(udpPort, rpcPort int)
- func (c *Cluster) SetRPCChannel(rpcCh *channels.RPCChannel)
- func (c *Cluster) SetTaskManagerForTest(tm *TaskManager)
- func (c *Cluster) Start() error
- func (c *Cluster) Stop() error
- func (c *Cluster) SubmitTask(ctx context.Context, peerID, action string, payload map[string]interface{}, ...) (string, error)
- func (c *Cluster) SyncToDisk() error
- type ClusterConfig
- type ClusterLogger
- func (l *ClusterLogger) Close() error
- func (l *ClusterLogger) DiscoveryDebug(format string, args ...interface{})
- func (l *ClusterLogger) DiscoveryError(format string, args ...interface{})
- func (l *ClusterLogger) DiscoveryInfo(format string, args ...interface{})
- func (l *ClusterLogger) LogRPCDebug(msg string, args ...interface{})
- func (l *ClusterLogger) LogRPCError(msg string, args ...interface{})
- func (l *ClusterLogger) LogRPCInfo(msg string, args ...interface{})
- func (l *ClusterLogger) RPCDebug(format string, args ...interface{})
- func (l *ClusterLogger) RPCError(format string, args ...interface{})
- func (l *ClusterLogger) RPCInfo(format string, args ...interface{})
- type ClusterMeta
- type ContinuationSnapshot
- type ContinuationStore
- func (s *ContinuationStore) CleanupOld(maxAge time.Duration) (int, error)
- func (s *ContinuationStore) Delete(taskID string) error
- func (s *ContinuationStore) ListPending() ([]string, error)
- func (s *ContinuationStore) Load(taskID string) (*ContinuationSnapshot, error)
- func (s *ContinuationStore) Save(snapshot *ContinuationSnapshot) error
- type DynamicState
- type InMemoryTaskStore
- func (s *InMemoryTaskStore) Create(task *Task) error
- func (s *InMemoryTaskStore) Delete(taskID string) error
- func (s *InMemoryTaskStore) Get(taskID string) (*Task, error)
- func (s *InMemoryTaskStore) ListByStatus(status TaskStatus) ([]*Task, error)
- func (s *InMemoryTaskStore) UpdateResult(taskID string, result *TaskResult) error
- type Logger
- type NetworkInterface
- type Node
- func (n *Node) GetAddress() string
- func (n *Node) GetAddresses() []string
- func (n *Node) GetCapabilities() []string
- func (n *Node) GetID() string
- func (n *Node) GetName() string
- func (n *Node) GetNodeStatus() NodeStatus
- func (n *Node) GetRPCPort() int
- func (n *Node) GetStatus() string
- func (n *Node) GetUptime() time.Duration
- func (n *Node) HasCapability(capability string) bool
- func (n *Node) IsOnline() bool
- func (n *Node) MarkOffline(reason string)
- func (n *Node) SetStatus(status NodeStatus)
- func (n *Node) String() string
- func (n *Node) ToConfig() PeerConfig
- func (n *Node) UpdateLastSeen()
- type NodeInfo
- type NodeStatus
- type PeerConfig
- type PeerStatus
- type Registry
- func (r *Registry) AddOrUpdate(node *Node)
- func (r *Registry) CheckTimeouts(timeout time.Duration) []string
- func (r *Registry) Count() int
- func (r *Registry) FindByCapability(capability string) []*Node
- func (r *Registry) FindByCapabilityOnline(capability string) []*Node
- func (r *Registry) Get(nodeID string) *Node
- func (r *Registry) GetAll() []*Node
- func (r *Registry) GetCapabilities() []string
- func (r *Registry) GetOnline() []*Node
- func (r *Registry) MarkOffline(nodeID string, reason string)
- func (r *Registry) OnlineCount() int
- func (r *Registry) Remove(nodeID string)
- type StaticConfig
- func CreateStaticConfig(nodeID, nodeName, address string) *StaticConfig
- func DefaultConfig(nodeID string) *StaticConfig
- func LoadConfig(configPath string) (*StaticConfig, error)
- func LoadOrCreateConfig(configPath string, nodeID string) (*StaticConfig, error)
- func LoadStaticConfig(configPath string) (*StaticConfig, error)
- type Task
- type TaskManager
- func (tm *TaskManager) CompleteCallback(taskID, status, response, errMsg string) error
- func (tm *TaskManager) CompleteTask(taskID string, result *TaskResult) error
- func (tm *TaskManager) GetTask(taskID string) (*Task, error)
- func (tm *TaskManager) ListPendingTasks() ([]*Task, error)
- func (tm *TaskManager) SetOnComplete(fn func(taskID string))
- func (tm *TaskManager) Start()
- func (tm *TaskManager) Stop()
- func (tm *TaskManager) Submit(task *Task) error
- type TaskResult
- type TaskResultEntry
- type TaskResultIndex
- type TaskResultStore
- type TaskStatus
- type TaskStore
Constants ¶
const ( // DefaultUDPPort is the default UDP broadcast port DefaultUDPPort = 11949 // DefaultRPCPort is the default WebSocket RPC port DefaultRPCPort = 21949 // DefaultBroadcastInterval is the default broadcast interval DefaultBroadcastInterval = 30 * time.Second // DefaultTimeout is the default timeout for marking a node as offline DefaultTimeout = 90 * time.Second )
Variables ¶
This section is empty.
Functions ¶
func GenerateNodeID ¶
GenerateNodeID generates a unique node ID based on hostname and timestamp
func GetAllLocalIPs ¶
GetAllLocalIPs returns all local IP addresses for broadcast Returns non-virtual interfaces only, sorted by priority (Ethernet > WiFi > Other) This is used for UDP discovery broadcast to tell other nodes how to reach us
func GetCurrentTime ¶
GetCurrentTime returns current time (wrapper for time.Now()) This is useful for testing and consistency
func IsSameSubnet ¶
IsSameSubnet checks if two IP addresses are in the same subnet Uses real subnet masks from local network interfaces Returns true if they are in the same subnet, false otherwise
func SaveAppConfig ¶
SaveAppConfig saves cluster application configuration to workspace/config/config.cluster.json
func SaveConfig ¶
func SaveConfig(configPath string, config *StaticConfig) error
SaveConfig saves the cluster configuration (legacy alias)
func SaveDynamicState ¶
func SaveDynamicState(statePath string, state *DynamicState) error
SaveDynamicState saves the dynamic cluster state to state.toml
func SaveStaticConfig ¶
func SaveStaticConfig(configPath string, config *StaticConfig) error
SaveStaticConfig saves the static cluster configuration to peers.toml
Types ¶
type ActionSchema ¶
type ActionSchema struct {
Name string `json:"name"`
Description string `json:"description"`
Parameters map[string]interface{} `json:"parameters,omitempty"`
Returns map[string]interface{} `json:"returns,omitempty"`
Examples []map[string]interface{} `json:"examples,omitempty"`
}
ActionSchema 定义了单个 action 的完整 schema
type AppConfig ¶
type AppConfig struct {
Enabled bool `json:"enabled" env:"CLUSTER_ENABLED"`
Port int `json:"port" env:"CLUSTER_UDP_PORT"`
RPCPort int `json:"rpc_port" env:"CLUSTER_RPC_PORT"`
BroadcastInterval int `json:"broadcast_interval" env:"CLUSTER_BROADCAST_INTERVAL"`
}
AppConfig contains cluster application configuration (loaded from config.cluster.json) This is separate from ClusterConfig which contains runtime state (peers.toml)
func DefaultAppConfig ¶
func DefaultAppConfig() *AppConfig
DefaultAppConfig returns the default cluster application configuration
func LoadAppConfig ¶
LoadAppConfig loads cluster application configuration from workspace/config/config.cluster.json If the file doesn't exist, loads from embedded default config
type Cluster ¶
type Cluster struct {
// contains filtered or unexported fields
}
Cluster represents the bot cluster
func NewCluster ¶
NewCluster creates a new cluster instance
func (*Cluster) CallWithContext ¶
func (c *Cluster) CallWithContext(ctx context.Context, peerID, action string, payload map[string]interface{}) ([]byte, error)
CallWithContext makes an RPC call to a peer with context support for cancellation and timeout
func (*Cluster) CleanupTask ¶
CleanupTask 清理已完成的任务(由续行流程调用)
func (*Cluster) FindPeersByCapability ¶
FindPeersByCapability returns nodes with a specific capability
func (*Cluster) GetActionsSchema ¶
func (c *Cluster) GetActionsSchema() []interface{}
GetActionsSchema 返回所有可用 actions 的 schema 定义
func (*Cluster) GetActionsSchemaJSON ¶
GetActionsSchemaJSON 返回 actions schema 的 JSON 格式
func (*Cluster) GetAddress ¶
GetAddress returns the RPC address of this node
func (*Cluster) GetAllLocalIPs ¶
GetAllLocalIPs returns all local IP addresses (for discovery callback)
func (*Cluster) GetCapabilities ¶
GetCapabilities returns all capabilities from all nodes
func (*Cluster) GetCategory ¶
GetCategory returns the node category (for discovery callback)
func (*Cluster) GetContinuationStore ¶
func (c *Cluster) GetContinuationStore() *ContinuationStore
GetContinuationStore 暴露续行快照存储给 AgentLoop
func (*Cluster) GetLocalNetworkInterfaces ¶
func (c *Cluster) GetLocalNetworkInterfaces() ([]rpc.LocalNetworkInterface, error)
GetLocalNetworkInterfaces returns local network interfaces (for RPC client)
func (*Cluster) GetLogger ¶
func (c *Cluster) GetLogger() *ClusterLogger
GetLogger returns the cluster logger
func (*Cluster) GetOnlinePeers ¶
func (c *Cluster) GetOnlinePeers() []interface{}
GetOnlinePeers returns all online nodes as interface slice (for RPC compatibility)
func (*Cluster) GetRPCChannel ¶
func (c *Cluster) GetRPCChannel() *channels.RPCChannel
GetRPCChannel returns the RPC channel (may be nil if not configured)
func (*Cluster) GetRPCPort ¶
GetRPCPort returns the RPC port (for discovery callback)
func (*Cluster) GetRegistry ¶
func (c *Cluster) GetRegistry() interface{}
GetRegistry returns the registry (as interface for RPC compatibility)
func (*Cluster) GetTaskResultStore ¶
func (c *Cluster) GetTaskResultStore() *TaskResultStore
GetTaskResultStore 暴露任务结果存储给 PeerChatHandler
func (*Cluster) GetTaskResultStorer ¶
func (c *Cluster) GetTaskResultStorer() rpc.TaskResultStorer
GetTaskResultStorer 满足 rpc.Cluster 接口(返回 TaskResultStorer)
func (*Cluster) GetWorkspace ¶
GetWorkspace returns the workspace path
func (*Cluster) HandleDiscoveredNode ¶
func (c *Cluster) HandleDiscoveredNode(nodeID, name string, addresses []string, rpcPort int, role, category string, tags []string, capabilities []string)
HandleDiscoveredNode handles a node discovered via UDP broadcast
func (*Cluster) HandleNodeOffline ¶
HandleNodeOffline handles a node going offline
func (*Cluster) HandleTaskCompleteForTest ¶
HandleTaskCompleteForTest 暴露 handleTaskComplete 给测试代码
func (*Cluster) LogRPCDebug ¶
LogRPCDebug logs an RPC debug message (for RPC callback)
func (*Cluster) LogRPCError ¶
LogRPCError logs an RPC error message (for RPC callback)
func (*Cluster) LogRPCInfo ¶
LogRPCInfo logs an RPC info message (for RPC callback)
func (*Cluster) RegisterBasicHandlers ¶
RegisterBasicHandlers registers basic RPC handlers (default and custom) This can be called directly in daemon mode where RPCChannel is not available
func (*Cluster) RegisterRPCHandler ¶
func (c *Cluster) RegisterRPCHandler(action string, handler func(payload map[string]interface{}) (map[string]interface{}, error)) error
RegisterRPCHandler registers an RPC handler for an action This allows external code to register custom RPC handlers
func (*Cluster) SetMessageBus ¶
func (c *Cluster) SetMessageBus(bus *bus.MessageBus)
SetMessageBus 设置消息总线(Phase 2: 由 AgentLoop 在 setupClusterRPCChannel 中调用)
func (*Cluster) SetRPCChannel ¶
func (c *Cluster) SetRPCChannel(rpcCh *channels.RPCChannel)
SetRPCChannel sets the RPC channel and triggers LLM handler registration This is called by loop.go after creating the RPCChannel
Thread safety: This method uses lock-free pattern to avoid deadlock:
- Acquires lock only to set c.rpcChannel and read state
- Releases lock before calling registerPeerChatHandlers()
- This avoids deadlock: registerPeerChatHandlers() internally calls RegisterRPCHandler() which tries to acquire a read lock while we might be holding a write lock
There's a tiny race window between Unlock() and registerPeerChatHandlers() where Stop() or server shutdown could occur. This is acceptable as: - It's extremely short (microseconds) - Worst case: LLM handlers don't get registered, but no deadlock occurs - RegisterRPCHandler() has its own state checks and will return error if not running
func (*Cluster) SetTaskManagerForTest ¶
func (c *Cluster) SetTaskManagerForTest(tm *TaskManager)
SetTaskManagerForTest 设置 TaskManager(测试专用,允许注入自定义 TaskManager)
func (*Cluster) SubmitTask ¶
func (c *Cluster) SubmitTask(ctx context.Context, peerID, action string, payload map[string]interface{}, channel, chatID string) (string, error)
SubmitTask 提交异步 RPC 任务 发送请求到目标节点(短同步调用获取 ACK),返回 taskID channel 和 chatID 记录发起方的通道信息,用于续行通知路由
func (*Cluster) SyncToDisk ¶
SyncToDisk saves the current state to state.toml (dynamic state only)
type ClusterLogger ¶
type ClusterLogger struct {
// contains filtered or unexported fields
}
ClusterLogger manages logging for the cluster module
func NewClusterLogger ¶
func NewClusterLogger(workspace string) (*ClusterLogger, error)
NewClusterLogger creates a new cluster logger
func (*ClusterLogger) DiscoveryDebug ¶
func (l *ClusterLogger) DiscoveryDebug(format string, args ...interface{})
func (*ClusterLogger) DiscoveryError ¶
func (l *ClusterLogger) DiscoveryError(format string, args ...interface{})
func (*ClusterLogger) DiscoveryInfo ¶
func (l *ClusterLogger) DiscoveryInfo(format string, args ...interface{})
Discovery logging methods
func (*ClusterLogger) LogRPCDebug ¶
func (l *ClusterLogger) LogRPCDebug(msg string, args ...interface{})
LogRPCDebug logs an RPC debug message (aliases for handlers.Logger interface)
func (*ClusterLogger) LogRPCError ¶
func (l *ClusterLogger) LogRPCError(msg string, args ...interface{})
LogRPCError logs an RPC error message (aliases for handlers.Logger interface)
func (*ClusterLogger) LogRPCInfo ¶
func (l *ClusterLogger) LogRPCInfo(msg string, args ...interface{})
LogRPCInfo logs an RPC info message (aliases for handlers.Logger interface)
func (*ClusterLogger) RPCDebug ¶
func (l *ClusterLogger) RPCDebug(format string, args ...interface{})
func (*ClusterLogger) RPCError ¶
func (l *ClusterLogger) RPCError(format string, args ...interface{})
func (*ClusterLogger) RPCInfo ¶
func (l *ClusterLogger) RPCInfo(format string, args ...interface{})
RPC logging methods
type ClusterMeta ¶
type ClusterMeta struct {
ID string `toml:"id"`
AutoDiscovery bool `toml:"auto_discovery"`
LastUpdated time.Time `toml:"last_updated"`
RPCAuthToken string `toml:"rpc_auth_token"` // RPC authentication token
}
ClusterMeta contains cluster metadata
type ContinuationSnapshot ¶
type ContinuationSnapshot struct {
TaskID string `json:"task_id"`
Messages json.RawMessage `json:"messages"` // []providers.Message 的原始 JSON
ToolCallID string `json:"tool_call_id"` // 触发异步的 tool call ID
Channel string `json:"channel"` // 原始通道
ChatID string `json:"chat_id"` // 原始会话 ID
CreatedAt time.Time `json:"created_at"` // 创建时间(用于清理)
}
ContinuationSnapshot 续行快照(存储到磁盘)
type ContinuationStore ¶
type ContinuationStore struct {
// contains filtered or unexported fields
}
ContinuationStore 续行快照文件存储
func NewContinuationStore ¶
func NewContinuationStore(workspace string) (*ContinuationStore, error)
NewContinuationStore 创建续行存储
func (*ContinuationStore) CleanupOld ¶
func (s *ContinuationStore) CleanupOld(maxAge time.Duration) (int, error)
CleanupOld 清理超过 maxAge 的快照文件,返回清理数量
func (*ContinuationStore) Delete ¶
func (s *ContinuationStore) Delete(taskID string) error
Delete 删除续行快照文件
func (*ContinuationStore) ListPending ¶
func (s *ContinuationStore) ListPending() ([]string, error)
ListPending 列出所有待处理的快照 taskID(用于启动时恢复)
func (*ContinuationStore) Load ¶
func (s *ContinuationStore) Load(taskID string) (*ContinuationSnapshot, error)
Load 从磁盘加载续行快照
func (*ContinuationStore) Save ¶
func (s *ContinuationStore) Save(snapshot *ContinuationSnapshot) error
Save 保存续行快照到磁盘(原子写入:先写 tmp 再 rename)
type DynamicState ¶
type DynamicState struct {
Cluster ClusterMeta `toml:"cluster"`
LocalNode NodeInfo `toml:"local_node"`
Discovered []PeerConfig `toml:"discovered"`
LastSync time.Time `toml:"last_sync"`
}
DynamicState represents the dynamic cluster state (state.toml) This file is automatically managed by the cluster module It contains runtime information about discovered peers
func LoadDynamicState ¶
func LoadDynamicState(statePath string) (*DynamicState, error)
LoadDynamicState loads the dynamic cluster state from state.toml
type InMemoryTaskStore ¶
type InMemoryTaskStore struct {
// contains filtered or unexported fields
}
InMemoryTaskStore 内存实现(Phase 1)
func NewInMemoryTaskStore ¶
func NewInMemoryTaskStore() *InMemoryTaskStore
NewInMemoryTaskStore 创建内存任务存储
func (*InMemoryTaskStore) Create ¶
func (s *InMemoryTaskStore) Create(task *Task) error
Create 创建任务记录
func (*InMemoryTaskStore) Delete ¶
func (s *InMemoryTaskStore) Delete(taskID string) error
Delete 删除任务记录
func (*InMemoryTaskStore) Get ¶
func (s *InMemoryTaskStore) Get(taskID string) (*Task, error)
Get 获取任务记录
func (*InMemoryTaskStore) ListByStatus ¶
func (s *InMemoryTaskStore) ListByStatus(status TaskStatus) ([]*Task, error)
ListByStatus 按状态列出任务
func (*InMemoryTaskStore) UpdateResult ¶
func (s *InMemoryTaskStore) UpdateResult(taskID string, result *TaskResult) error
UpdateResult 更新任务结果
type Logger ¶
type Logger interface {
RPCInfo(format string, args ...interface{})
RPCError(format string, args ...interface{})
RPCDebug(format string, args ...interface{})
}
Logger represents logging functions for RPC operations
type NetworkInterface ¶
type NetworkInterface struct {
IP string
Mask string
NetworkIP string // The network address (IP & Mask)
}
NetworkInterface represents a local network interface with its subnet
func GetLocalNetworkInterfaces ¶
func GetLocalNetworkInterfaces() ([]NetworkInterface, error)
GetLocalNetworkInterfaces returns all local network interfaces with their subnet masks
type Node ¶
type Node struct {
ID string `toml:"id"`
Name string `toml:"name"`
Address string `toml:"address"` // Deprecated: Primary IP:Port (for backward compatibility)
Addresses []string `toml:"addresses"` // List of all IP addresses
RPCPort int `toml:"rpc_port"` // RPC port number
Role string `toml:"role"` // Cluster role
Category string `toml:"category"` // Business category
Tags []string `toml:"tags"` // Custom tags
Capabilities []string `toml:"capabilities"`
Priority int `toml:"priority"`
// Runtime state
Status NodeStatus `toml:"-"`
LastSeen time.Time `toml:"-"`
TasksCompleted int `toml:"-"`
SuccessRate float64 `toml:"-"`
AvgResponseTime int `toml:"-"`
LastError string `toml:"-"`
// contains filtered or unexported fields
}
Node represents a bot in the cluster
func (*Node) GetAddress ¶
GetAddress returns the node address (for RPC interface)
func (*Node) GetAddresses ¶
GetAddresses returns all IP addresses of the node (for RPC interface)
func (*Node) GetCapabilities ¶
GetCapabilities returns the node capabilities (for RPC interface)
func (*Node) GetNodeStatus ¶
func (n *Node) GetNodeStatus() NodeStatus
GetNodeStatus returns the current status as NodeStatus type
func (*Node) GetRPCPort ¶
GetRPCPort returns the RPC port of the node (for RPC interface)
func (*Node) HasCapability ¶
HasCapability checks if the node has a specific capability
func (*Node) MarkOffline ¶
MarkOffline marks the node as offline
func (*Node) SetStatus ¶
func (n *Node) SetStatus(status NodeStatus)
SetStatus updates the node status
func (*Node) ToConfig ¶
func (n *Node) ToConfig() PeerConfig
ToConfig converts Node to PeerConfig for TOML serialization
func (*Node) UpdateLastSeen ¶
func (n *Node) UpdateLastSeen()
UpdateLastSeen updates the last seen timestamp
type NodeInfo ¶
type NodeInfo struct {
ID string `toml:"id"`
Name string `toml:"name"`
Address string `toml:"address"`
Role string `toml:"role"` // Cluster role: manager, coordinator, worker, observer, standby
Category string `toml:"category"` // Business category: design, development, testing, ops, deployment, analysis, general
Tags []string `toml:"tags"` // Custom tags for flexible classification
Capabilities []string `toml:"capabilities"`
}
NodeInfo contains information about the current node
type NodeStatus ¶
type NodeStatus string
NodeStatus represents the current status of a node
const ( StatusOnline NodeStatus = "online" StatusOffline NodeStatus = "offline" StatusUnknown NodeStatus = "unknown" )
type PeerConfig ¶
type PeerConfig struct {
ID string `toml:"id"`
Name string `toml:"name"`
Address string `toml:"address"` // Deprecated: Primary address for backward compatibility
Addresses []string `toml:"addresses"` // List of all IP addresses
RPCPort int `toml:"rpc_port"` // RPC port number
Role string `toml:"role"` // Cluster role: manager, coordinator, worker, observer, standby
Category string `toml:"category"` // Business category: design, development, testing, ops, deployment, analysis, general
Tags []string `toml:"tags"` // Custom tags for flexible classification
Capabilities []string `toml:"capabilities"`
Priority int `toml:"priority"`
Enabled bool `toml:"enabled"`
Status PeerStatus `toml:"status"`
}
PeerConfig represents a peer node configuration
type PeerStatus ¶
type PeerStatus struct {
State string `toml:"state"`
LastSeen time.Time `toml:"last_seen"`
Uptime string `toml:"uptime"` // Human-readable uptime
TasksCompleted int `toml:"tasks_completed"`
SuccessRate float64 `toml:"success_rate"`
AvgResponseTime int `toml:"avg_response_time"` // milliseconds
LastError string `toml:"last_error"`
}
PeerStatus contains runtime status of a peer
type Registry ¶
type Registry struct {
// contains filtered or unexported fields
}
Registry manages the cluster node registry
func (*Registry) AddOrUpdate ¶
AddOrUpdate adds a new node or updates an existing one
func (*Registry) CheckTimeouts ¶
CheckTimeouts checks all nodes and marks those as offline that haven't been seen recently
func (*Registry) FindByCapability ¶
FindByCapability returns nodes that have a specific capability
func (*Registry) FindByCapabilityOnline ¶
FindByCapabilityOnline returns online nodes that have a specific capability
func (*Registry) GetCapabilities ¶
GetCapabilities returns all unique capabilities from all nodes
func (*Registry) MarkOffline ¶
MarkOffline marks a node as offline
func (*Registry) OnlineCount ¶
OnlineCount returns the number of online nodes
type StaticConfig ¶
type StaticConfig struct {
Cluster ClusterMeta `toml:"cluster"`
Node NodeInfo `toml:"node"`
Peers []PeerConfig `toml:"peers"`
}
StaticConfig represents the static cluster configuration (peers.toml) This file is created during onboard and contains the current node's information Users can manually edit this file to add known peers
func CreateStaticConfig ¶
func CreateStaticConfig(nodeID, nodeName, address string) *StaticConfig
CreateStaticConfig creates a default static configuration for the current node
func DefaultConfig ¶
func DefaultConfig(nodeID string) *StaticConfig
DefaultConfig creates a default cluster configuration (legacy alias)
func LoadConfig ¶
func LoadConfig(configPath string) (*StaticConfig, error)
LoadConfig loads the cluster configuration (legacy alias)
func LoadOrCreateConfig ¶
func LoadOrCreateConfig(configPath string, nodeID string) (*StaticConfig, error)
LoadOrCreateConfig loads existing config or creates a default one (legacy alias)
func LoadStaticConfig ¶
func LoadStaticConfig(configPath string) (*StaticConfig, error)
LoadStaticConfig loads the static cluster configuration from peers.toml
type Task ¶
type Task struct {
ID string `json:"id"` // 唯一任务 ID
Action string `json:"action"` // RPC action (peer_chat)
PeerID string `json:"peer_id"` // 目标节点 ID
Payload map[string]interface{} `json:"payload"` // 原始请求 payload
Status TaskStatus `json:"status"` // 当前状态
CreatedAt time.Time `json:"created_at"` // 创建时间
CompletedAt *time.Time `json:"completed_at"` // 完成时间
// 结果
Response string `json:"response,omitempty"` // LLM 回复内容
Result map[string]interface{} `json:"result,omitempty"` // 回调结果
Error string `json:"error,omitempty"` // 错误信息
// Phase 2: 原始通道信息(用于续行通知路由)
OriginalChannel string `json:"original_channel,omitempty"` // 发起方的通道(如 "web")
OriginalChatID string `json:"original_chat_id,omitempty"` // 发起方的会话 ID
}
Task 表示一个异步 RPC 任务
type TaskManager ¶
type TaskManager struct {
// contains filtered or unexported fields
}
TaskManager 任务生命周期管理
func NewTaskManager ¶
func NewTaskManager(cleanupInterval time.Duration) *TaskManager
NewTaskManager 创建任务管理器
func (*TaskManager) CompleteCallback ¶
func (tm *TaskManager) CompleteCallback(taskID, status, response, errMsg string) error
CompleteCallback 实现 handlers.TaskCompleter 接口 将基本类型的回调参数转换为 TaskResult 后调用 CompleteTask
func (*TaskManager) CompleteTask ¶
func (tm *TaskManager) CompleteTask(taskID string, result *TaskResult) error
CompleteTask 标记任务完成(由 CallbackHandler 调用)
func (*TaskManager) GetTask ¶
func (tm *TaskManager) GetTask(taskID string) (*Task, error)
GetTask 获取任务信息
func (*TaskManager) ListPendingTasks ¶
func (tm *TaskManager) ListPendingTasks() ([]*Task, error)
ListPendingTasks 获取所有 pending 状态的任务(H4: recoveryLoop 使用)
func (*TaskManager) SetOnComplete ¶
func (tm *TaskManager) SetOnComplete(fn func(taskID string))
SetOnComplete 设置任务完成回调(Phase 2)
type TaskResult ¶
type TaskResult struct {
TaskID string `json:"task_id"`
Status string `json:"status"` // success | error
Response string `json:"response"`
Result map[string]interface{} `json:"result,omitempty"`
Error string `json:"error,omitempty"`
}
TaskResult 任务回调结果
type TaskResultEntry ¶
type TaskResultEntry struct {
TaskID string `json:"task_id"`
Status string `json:"status"` // "running" | "done"
ResultStatus string `json:"result_status"` // "success" | "error"(仅 done 时有值)
Response string `json:"response,omitempty"`
Error string `json:"error,omitempty"`
SourceNode string `json:"source_node"`
CreatedAt time.Time `json:"created_at"`
UpdatedAt time.Time `json:"updated_at"`
}
TaskResultEntry B 端任务结果记录
type TaskResultIndex ¶
type TaskResultIndex struct {
Tasks map[string]*TaskResultEntry `json:"tasks"`
}
TaskResultIndex 磁盘索引
type TaskResultStore ¶
type TaskResultStore struct {
// contains filtered or unexported fields
}
TaskResultStore B 端任务结果持久化存储 running 状态仅存内存(进程重启后丢失,A 端会再次询问) done 状态写磁盘(数据文件 + 索引文件)
func NewTaskResultStore ¶
func NewTaskResultStore(workspace string) (*TaskResultStore, error)
NewTaskResultStore 创建任务结果存储
func (*TaskResultStore) Delete ¶
func (s *TaskResultStore) Delete(taskID string) error
Delete 删除任务结果(数据文件 + 索引 + 磁盘)
func (*TaskResultStore) Get ¶
func (s *TaskResultStore) Get(taskID string) *TaskResultEntry
Get 查询任务结果(先查 running,再查索引)
func (*TaskResultStore) SetResult ¶
func (s *TaskResultStore) SetResult(taskID, resultStatus, response, errMsg, sourceNode string) error
SetResult 写入完成结果(数据文件 + 索引 + 磁盘),同时清理 running 标记
func (*TaskResultStore) SetRunning ¶
func (s *TaskResultStore) SetRunning(taskID, sourceNode string)
SetRunning 标记任务为 running(仅内存)
type TaskStatus ¶
type TaskStatus string
TaskStatus 任务状态
const ( TaskPending TaskStatus = "pending" // 已提交,等待 B 处理 TaskCompleted TaskStatus = "completed" // B 已返回结果 TaskFailed TaskStatus = "failed" // B 处理失败或回调失败 TaskCancelled TaskStatus = "cancelled" // 主动取消(Phase 2) )