cluster

package
v0.0.0-...-1f1a93d Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: May 13, 2026 License: MIT Imports: 18 Imported by: 0

Documentation

Overview

Package cluster provides network utility functions for subnet matching

Package cluster provides cluster configuration and management utilities

Index

Constants

View Source
const (
	// DefaultUDPPort is the default UDP broadcast port
	DefaultUDPPort = 11949
	// DefaultRPCPort is the default WebSocket RPC port
	DefaultRPCPort = 21949
	// DefaultBroadcastInterval is the default broadcast interval
	DefaultBroadcastInterval = 30 * time.Second
	// DefaultTimeout is the default timeout for marking a node as offline
	DefaultTimeout = 90 * time.Second
)

Variables

This section is empty.

Functions

func GenerateNodeID

func GenerateNodeID() (string, error)

GenerateNodeID generates a unique node ID based on hostname and timestamp

func GetAllLocalIPs

func GetAllLocalIPs() ([]string, error)

GetAllLocalIPs returns all local IP addresses for broadcast Returns non-virtual interfaces only, sorted by priority (Ethernet > WiFi > Other) This is used for UDP discovery broadcast to tell other nodes how to reach us

func GetCurrentTime

func GetCurrentTime() time.Time

GetCurrentTime returns current time (wrapper for time.Now()) This is useful for testing and consistency

func IsSameSubnet

func IsSameSubnet(ip1, ip2 string) bool

IsSameSubnet checks if two IP addresses are in the same subnet Uses real subnet masks from local network interfaces Returns true if they are in the same subnet, false otherwise

func SaveAppConfig

func SaveAppConfig(workspace string, cfg *AppConfig) error

SaveAppConfig saves cluster application configuration to workspace/config/config.cluster.json

func SaveConfig

func SaveConfig(configPath string, config *StaticConfig) error

SaveConfig saves the cluster configuration (legacy alias)

func SaveDynamicState

func SaveDynamicState(statePath string, state *DynamicState) error

SaveDynamicState saves the dynamic cluster state to state.toml

func SaveStaticConfig

func SaveStaticConfig(configPath string, config *StaticConfig) error

SaveStaticConfig saves the static cluster configuration to peers.toml

Types

type ActionSchema

type ActionSchema struct {
	Name        string                   `json:"name"`
	Description string                   `json:"description"`
	Parameters  map[string]interface{}   `json:"parameters,omitempty"`
	Returns     map[string]interface{}   `json:"returns,omitempty"`
	Examples    []map[string]interface{} `json:"examples,omitempty"`
}

ActionSchema 定义了单个 action 的完整 schema

type AppConfig

type AppConfig struct {
	Enabled           bool `json:"enabled" env:"CLUSTER_ENABLED"`
	Port              int  `json:"port" env:"CLUSTER_UDP_PORT"`
	RPCPort           int  `json:"rpc_port" env:"CLUSTER_RPC_PORT"`
	BroadcastInterval int  `json:"broadcast_interval" env:"CLUSTER_BROADCAST_INTERVAL"`
}

AppConfig contains cluster application configuration (loaded from config.cluster.json) This is separate from ClusterConfig which contains runtime state (peers.toml)

func DefaultAppConfig

func DefaultAppConfig() *AppConfig

DefaultAppConfig returns the default cluster application configuration

func LoadAppConfig

func LoadAppConfig(workspace string) (*AppConfig, error)

LoadAppConfig loads cluster application configuration from workspace/config/config.cluster.json If the file doesn't exist, loads from embedded default config

type Cluster

type Cluster struct {
	// contains filtered or unexported fields
}

Cluster represents the bot cluster

func NewCluster

func NewCluster(workspace string) (*Cluster, error)

NewCluster creates a new cluster instance

func (*Cluster) Call

func (c *Cluster) Call(peerID, action string, payload map[string]interface{}) ([]byte, error)

Call makes an RPC call to a peer

func (*Cluster) CallWithContext

func (c *Cluster) CallWithContext(ctx context.Context, peerID, action string, payload map[string]interface{}) ([]byte, error)

CallWithContext makes an RPC call to a peer with context support for cancellation and timeout

func (*Cluster) CleanupTask

func (c *Cluster) CleanupTask(taskID string)

CleanupTask 清理已完成的任务(由续行流程调用)

func (*Cluster) FindPeersByCapability

func (c *Cluster) FindPeersByCapability(capability string) []*Node

FindPeersByCapability returns nodes with a specific capability

func (*Cluster) GetActionsSchema

func (c *Cluster) GetActionsSchema() []interface{}

GetActionsSchema 返回所有可用 actions 的 schema 定义

func (*Cluster) GetActionsSchemaJSON

func (c *Cluster) GetActionsSchemaJSON() ([]byte, error)

GetActionsSchemaJSON 返回 actions schema 的 JSON 格式

func (*Cluster) GetAddress

func (c *Cluster) GetAddress() string

GetAddress returns the RPC address of this node

func (*Cluster) GetAllLocalIPs

func (c *Cluster) GetAllLocalIPs() []string

GetAllLocalIPs returns all local IP addresses (for discovery callback)

func (*Cluster) GetCapabilities

func (c *Cluster) GetCapabilities() []string

GetCapabilities returns all capabilities from all nodes

func (*Cluster) GetCategory

func (c *Cluster) GetCategory() string

GetCategory returns the node category (for discovery callback)

func (*Cluster) GetContinuationStore

func (c *Cluster) GetContinuationStore() *ContinuationStore

GetContinuationStore 暴露续行快照存储给 AgentLoop

func (*Cluster) GetLocalNetworkInterfaces

func (c *Cluster) GetLocalNetworkInterfaces() ([]rpc.LocalNetworkInterface, error)

GetLocalNetworkInterfaces returns local network interfaces (for RPC client)

func (*Cluster) GetLogger

func (c *Cluster) GetLogger() *ClusterLogger

GetLogger returns the cluster logger

func (*Cluster) GetNodeID

func (c *Cluster) GetNodeID() string

GetNodeID returns the node ID

func (*Cluster) GetOnlinePeers

func (c *Cluster) GetOnlinePeers() []interface{}

GetOnlinePeers returns all online nodes as interface slice (for RPC compatibility)

func (*Cluster) GetPeer

func (c *Cluster) GetPeer(peerID string) (interface{}, error)

GetPeer returns a peer node by ID (for RPC client)

func (*Cluster) GetPorts

func (c *Cluster) GetPorts() (udpPort, rpcPort int)

GetPorts returns the configured UDP and RPC ports

func (*Cluster) GetRPCChannel

func (c *Cluster) GetRPCChannel() *channels.RPCChannel

GetRPCChannel returns the RPC channel (may be nil if not configured)

func (*Cluster) GetRPCPort

func (c *Cluster) GetRPCPort() int

GetRPCPort returns the RPC port (for discovery callback)

func (*Cluster) GetRegistry

func (c *Cluster) GetRegistry() interface{}

GetRegistry returns the registry (as interface for RPC compatibility)

func (*Cluster) GetRole

func (c *Cluster) GetRole() string

GetRole returns the node role (for discovery callback)

func (*Cluster) GetTags

func (c *Cluster) GetTags() []string

GetTags returns the node tags (for discovery callback)

func (*Cluster) GetTask

func (c *Cluster) GetTask(taskID string) (*Task, error)

GetTask 获取任务信息(暴露给 AgentLoop 用于续行)

func (*Cluster) GetTaskResultStore

func (c *Cluster) GetTaskResultStore() *TaskResultStore

GetTaskResultStore 暴露任务结果存储给 PeerChatHandler

func (*Cluster) GetTaskResultStorer

func (c *Cluster) GetTaskResultStorer() rpc.TaskResultStorer

GetTaskResultStorer 满足 rpc.Cluster 接口(返回 TaskResultStorer)

func (*Cluster) GetWorkspace

func (c *Cluster) GetWorkspace() string

GetWorkspace returns the workspace path

func (*Cluster) HandleDiscoveredNode

func (c *Cluster) HandleDiscoveredNode(nodeID, name string, addresses []string, rpcPort int, role, category string, tags []string, capabilities []string)

HandleDiscoveredNode handles a node discovered via UDP broadcast

func (*Cluster) HandleNodeOffline

func (c *Cluster) HandleNodeOffline(nodeID, reason string)

HandleNodeOffline handles a node going offline

func (*Cluster) HandleTaskCompleteForTest

func (c *Cluster) HandleTaskCompleteForTest(taskID string)

HandleTaskCompleteForTest 暴露 handleTaskComplete 给测试代码

func (*Cluster) IsRunning

func (c *Cluster) IsRunning() bool

IsRunning returns true if the cluster is running

func (*Cluster) LogDebug

func (c *Cluster) LogDebug(msg string, args ...interface{})

LogDebug logs a debug message (for discovery callback)

func (*Cluster) LogError

func (c *Cluster) LogError(msg string, args ...interface{})

LogError logs an error message (for discovery callback)

func (*Cluster) LogInfo

func (c *Cluster) LogInfo(msg string, args ...interface{})

LogInfo logs an info message (for discovery callback)

func (*Cluster) LogRPCDebug

func (c *Cluster) LogRPCDebug(msg string, args ...interface{})

LogRPCDebug logs an RPC debug message (for RPC callback)

func (*Cluster) LogRPCError

func (c *Cluster) LogRPCError(msg string, args ...interface{})

LogRPCError logs an RPC error message (for RPC callback)

func (*Cluster) LogRPCInfo

func (c *Cluster) LogRPCInfo(msg string, args ...interface{})

LogRPCInfo logs an RPC info message (for RPC callback)

func (*Cluster) RegisterBasicHandlers

func (c *Cluster) RegisterBasicHandlers() error

RegisterBasicHandlers registers basic RPC handlers (default and custom) This can be called directly in daemon mode where RPCChannel is not available

func (*Cluster) RegisterRPCHandler

func (c *Cluster) RegisterRPCHandler(action string, handler func(payload map[string]interface{}) (map[string]interface{}, error)) error

RegisterRPCHandler registers an RPC handler for an action This allows external code to register custom RPC handlers

func (*Cluster) SetMessageBus

func (c *Cluster) SetMessageBus(bus *bus.MessageBus)

SetMessageBus 设置消息总线(Phase 2: 由 AgentLoop 在 setupClusterRPCChannel 中调用)

func (*Cluster) SetPorts

func (c *Cluster) SetPorts(udpPort, rpcPort int)

SetPorts sets the UDP and RPC ports

func (*Cluster) SetRPCChannel

func (c *Cluster) SetRPCChannel(rpcCh *channels.RPCChannel)

SetRPCChannel sets the RPC channel and triggers LLM handler registration This is called by loop.go after creating the RPCChannel

Thread safety: This method uses lock-free pattern to avoid deadlock:

  • Acquires lock only to set c.rpcChannel and read state
  • Releases lock before calling registerPeerChatHandlers()
  • This avoids deadlock: registerPeerChatHandlers() internally calls RegisterRPCHandler() which tries to acquire a read lock while we might be holding a write lock

There's a tiny race window between Unlock() and registerPeerChatHandlers() where Stop() or server shutdown could occur. This is acceptable as: - It's extremely short (microseconds) - Worst case: LLM handlers don't get registered, but no deadlock occurs - RegisterRPCHandler() has its own state checks and will return error if not running

func (*Cluster) SetTaskManagerForTest

func (c *Cluster) SetTaskManagerForTest(tm *TaskManager)

SetTaskManagerForTest 设置 TaskManager(测试专用,允许注入自定义 TaskManager)

func (*Cluster) Start

func (c *Cluster) Start() error

Start starts the cluster

func (*Cluster) Stop

func (c *Cluster) Stop() error

Stop stops the cluster

func (*Cluster) SubmitTask

func (c *Cluster) SubmitTask(ctx context.Context, peerID, action string, payload map[string]interface{}, channel, chatID string) (string, error)

SubmitTask 提交异步 RPC 任务 发送请求到目标节点(短同步调用获取 ACK),返回 taskID channel 和 chatID 记录发起方的通道信息,用于续行通知路由

func (*Cluster) SyncToDisk

func (c *Cluster) SyncToDisk() error

SyncToDisk saves the current state to state.toml (dynamic state only)

type ClusterConfig

type ClusterConfig = StaticConfig

Legacy aliases for backward compatibility

type ClusterLogger

type ClusterLogger struct {
	// contains filtered or unexported fields
}

ClusterLogger manages logging for the cluster module

func NewClusterLogger

func NewClusterLogger(workspace string) (*ClusterLogger, error)

NewClusterLogger creates a new cluster logger

func (*ClusterLogger) Close

func (l *ClusterLogger) Close() error

Close closes all log files

func (*ClusterLogger) DiscoveryDebug

func (l *ClusterLogger) DiscoveryDebug(format string, args ...interface{})

func (*ClusterLogger) DiscoveryError

func (l *ClusterLogger) DiscoveryError(format string, args ...interface{})

func (*ClusterLogger) DiscoveryInfo

func (l *ClusterLogger) DiscoveryInfo(format string, args ...interface{})

Discovery logging methods

func (*ClusterLogger) LogRPCDebug

func (l *ClusterLogger) LogRPCDebug(msg string, args ...interface{})

LogRPCDebug logs an RPC debug message (aliases for handlers.Logger interface)

func (*ClusterLogger) LogRPCError

func (l *ClusterLogger) LogRPCError(msg string, args ...interface{})

LogRPCError logs an RPC error message (aliases for handlers.Logger interface)

func (*ClusterLogger) LogRPCInfo

func (l *ClusterLogger) LogRPCInfo(msg string, args ...interface{})

LogRPCInfo logs an RPC info message (aliases for handlers.Logger interface)

func (*ClusterLogger) RPCDebug

func (l *ClusterLogger) RPCDebug(format string, args ...interface{})

func (*ClusterLogger) RPCError

func (l *ClusterLogger) RPCError(format string, args ...interface{})

func (*ClusterLogger) RPCInfo

func (l *ClusterLogger) RPCInfo(format string, args ...interface{})

RPC logging methods

type ClusterMeta

type ClusterMeta struct {
	ID            string    `toml:"id"`
	AutoDiscovery bool      `toml:"auto_discovery"`
	LastUpdated   time.Time `toml:"last_updated"`
	RPCAuthToken  string    `toml:"rpc_auth_token"` // RPC authentication token
}

ClusterMeta contains cluster metadata

type ContinuationSnapshot

type ContinuationSnapshot struct {
	TaskID     string          `json:"task_id"`
	Messages   json.RawMessage `json:"messages"`     // []providers.Message 的原始 JSON
	ToolCallID string          `json:"tool_call_id"` // 触发异步的 tool call ID
	Channel    string          `json:"channel"`      // 原始通道
	ChatID     string          `json:"chat_id"`      // 原始会话 ID
	CreatedAt  time.Time       `json:"created_at"`   // 创建时间(用于清理)
}

ContinuationSnapshot 续行快照(存储到磁盘)

type ContinuationStore

type ContinuationStore struct {
	// contains filtered or unexported fields
}

ContinuationStore 续行快照文件存储

func NewContinuationStore

func NewContinuationStore(workspace string) (*ContinuationStore, error)

NewContinuationStore 创建续行存储

func (*ContinuationStore) CleanupOld

func (s *ContinuationStore) CleanupOld(maxAge time.Duration) (int, error)

CleanupOld 清理超过 maxAge 的快照文件,返回清理数量

func (*ContinuationStore) Delete

func (s *ContinuationStore) Delete(taskID string) error

Delete 删除续行快照文件

func (*ContinuationStore) ListPending

func (s *ContinuationStore) ListPending() ([]string, error)

ListPending 列出所有待处理的快照 taskID(用于启动时恢复)

func (*ContinuationStore) Load

func (s *ContinuationStore) Load(taskID string) (*ContinuationSnapshot, error)

Load 从磁盘加载续行快照

func (*ContinuationStore) Save

func (s *ContinuationStore) Save(snapshot *ContinuationSnapshot) error

Save 保存续行快照到磁盘(原子写入:先写 tmp 再 rename)

type DynamicState

type DynamicState struct {
	Cluster    ClusterMeta  `toml:"cluster"`
	LocalNode  NodeInfo     `toml:"local_node"`
	Discovered []PeerConfig `toml:"discovered"`
	LastSync   time.Time    `toml:"last_sync"`
}

DynamicState represents the dynamic cluster state (state.toml) This file is automatically managed by the cluster module It contains runtime information about discovered peers

func LoadDynamicState

func LoadDynamicState(statePath string) (*DynamicState, error)

LoadDynamicState loads the dynamic cluster state from state.toml

type InMemoryTaskStore

type InMemoryTaskStore struct {
	// contains filtered or unexported fields
}

InMemoryTaskStore 内存实现(Phase 1)

func NewInMemoryTaskStore

func NewInMemoryTaskStore() *InMemoryTaskStore

NewInMemoryTaskStore 创建内存任务存储

func (*InMemoryTaskStore) Create

func (s *InMemoryTaskStore) Create(task *Task) error

Create 创建任务记录

func (*InMemoryTaskStore) Delete

func (s *InMemoryTaskStore) Delete(taskID string) error

Delete 删除任务记录

func (*InMemoryTaskStore) Get

func (s *InMemoryTaskStore) Get(taskID string) (*Task, error)

Get 获取任务记录

func (*InMemoryTaskStore) ListByStatus

func (s *InMemoryTaskStore) ListByStatus(status TaskStatus) ([]*Task, error)

ListByStatus 按状态列出任务

func (*InMemoryTaskStore) UpdateResult

func (s *InMemoryTaskStore) UpdateResult(taskID string, result *TaskResult) error

UpdateResult 更新任务结果

type Logger

type Logger interface {
	RPCInfo(format string, args ...interface{})
	RPCError(format string, args ...interface{})
	RPCDebug(format string, args ...interface{})
}

Logger represents logging functions for RPC operations

type NetworkInterface

type NetworkInterface struct {
	IP        string
	Mask      string
	NetworkIP string // The network address (IP & Mask)
}

NetworkInterface represents a local network interface with its subnet

func GetLocalNetworkInterfaces

func GetLocalNetworkInterfaces() ([]NetworkInterface, error)

GetLocalNetworkInterfaces returns all local network interfaces with their subnet masks

type Node

type Node struct {
	ID           string   `toml:"id"`
	Name         string   `toml:"name"`
	Address      string   `toml:"address"`   // Deprecated: Primary IP:Port (for backward compatibility)
	Addresses    []string `toml:"addresses"` // List of all IP addresses
	RPCPort      int      `toml:"rpc_port"`  // RPC port number
	Role         string   `toml:"role"`      // Cluster role
	Category     string   `toml:"category"`  // Business category
	Tags         []string `toml:"tags"`      // Custom tags
	Capabilities []string `toml:"capabilities"`
	Priority     int      `toml:"priority"`

	// Runtime state
	Status          NodeStatus `toml:"-"`
	LastSeen        time.Time  `toml:"-"`
	TasksCompleted  int        `toml:"-"`
	SuccessRate     float64    `toml:"-"`
	AvgResponseTime int        `toml:"-"`
	LastError       string     `toml:"-"`
	// contains filtered or unexported fields
}

Node represents a bot in the cluster

func (*Node) GetAddress

func (n *Node) GetAddress() string

GetAddress returns the node address (for RPC interface)

func (*Node) GetAddresses

func (n *Node) GetAddresses() []string

GetAddresses returns all IP addresses of the node (for RPC interface)

func (*Node) GetCapabilities

func (n *Node) GetCapabilities() []string

GetCapabilities returns the node capabilities (for RPC interface)

func (*Node) GetID

func (n *Node) GetID() string

GetID returns the node ID (for RPC interface)

func (*Node) GetName

func (n *Node) GetName() string

GetName returns the node name (for RPC interface)

func (*Node) GetNodeStatus

func (n *Node) GetNodeStatus() NodeStatus

GetNodeStatus returns the current status as NodeStatus type

func (*Node) GetRPCPort

func (n *Node) GetRPCPort() int

GetRPCPort returns the RPC port of the node (for RPC interface)

func (*Node) GetStatus

func (n *Node) GetStatus() string

GetStatus returns the current status as string

func (*Node) GetUptime

func (n *Node) GetUptime() time.Duration

GetUptime returns the uptime duration

func (*Node) HasCapability

func (n *Node) HasCapability(capability string) bool

HasCapability checks if the node has a specific capability

func (*Node) IsOnline

func (n *Node) IsOnline() bool

IsOnline returns true if the node is online

func (*Node) MarkOffline

func (n *Node) MarkOffline(reason string)

MarkOffline marks the node as offline

func (*Node) SetStatus

func (n *Node) SetStatus(status NodeStatus)

SetStatus updates the node status

func (*Node) String

func (n *Node) String() string

String returns a string representation of the node

func (*Node) ToConfig

func (n *Node) ToConfig() PeerConfig

ToConfig converts Node to PeerConfig for TOML serialization

func (*Node) UpdateLastSeen

func (n *Node) UpdateLastSeen()

UpdateLastSeen updates the last seen timestamp

type NodeInfo

type NodeInfo struct {
	ID           string   `toml:"id"`
	Name         string   `toml:"name"`
	Address      string   `toml:"address"`
	Role         string   `toml:"role"`     // Cluster role: manager, coordinator, worker, observer, standby
	Category     string   `toml:"category"` // Business category: design, development, testing, ops, deployment, analysis, general
	Tags         []string `toml:"tags"`     // Custom tags for flexible classification
	Capabilities []string `toml:"capabilities"`
}

NodeInfo contains information about the current node

type NodeStatus

type NodeStatus string

NodeStatus represents the current status of a node

const (
	StatusOnline  NodeStatus = "online"
	StatusOffline NodeStatus = "offline"
	StatusUnknown NodeStatus = "unknown"
)

type PeerConfig

type PeerConfig struct {
	ID           string     `toml:"id"`
	Name         string     `toml:"name"`
	Address      string     `toml:"address"`   // Deprecated: Primary address for backward compatibility
	Addresses    []string   `toml:"addresses"` // List of all IP addresses
	RPCPort      int        `toml:"rpc_port"`  // RPC port number
	Role         string     `toml:"role"`      // Cluster role: manager, coordinator, worker, observer, standby
	Category     string     `toml:"category"`  // Business category: design, development, testing, ops, deployment, analysis, general
	Tags         []string   `toml:"tags"`      // Custom tags for flexible classification
	Capabilities []string   `toml:"capabilities"`
	Priority     int        `toml:"priority"`
	Enabled      bool       `toml:"enabled"`
	Status       PeerStatus `toml:"status"`
}

PeerConfig represents a peer node configuration

type PeerStatus

type PeerStatus struct {
	State           string    `toml:"state"`
	LastSeen        time.Time `toml:"last_seen"`
	Uptime          string    `toml:"uptime"` // Human-readable uptime
	TasksCompleted  int       `toml:"tasks_completed"`
	SuccessRate     float64   `toml:"success_rate"`
	AvgResponseTime int       `toml:"avg_response_time"` // milliseconds
	LastError       string    `toml:"last_error"`
}

PeerStatus contains runtime status of a peer

type Registry

type Registry struct {
	// contains filtered or unexported fields
}

Registry manages the cluster node registry

func NewRegistry

func NewRegistry() *Registry

NewRegistry creates a new registry

func (*Registry) AddOrUpdate

func (r *Registry) AddOrUpdate(node *Node)

AddOrUpdate adds a new node or updates an existing one

func (*Registry) CheckTimeouts

func (r *Registry) CheckTimeouts(timeout time.Duration) []string

CheckTimeouts checks all nodes and marks those as offline that haven't been seen recently

func (*Registry) Count

func (r *Registry) Count() int

Count returns the total number of nodes

func (*Registry) FindByCapability

func (r *Registry) FindByCapability(capability string) []*Node

FindByCapability returns nodes that have a specific capability

func (*Registry) FindByCapabilityOnline

func (r *Registry) FindByCapabilityOnline(capability string) []*Node

FindByCapabilityOnline returns online nodes that have a specific capability

func (*Registry) Get

func (r *Registry) Get(nodeID string) *Node

Get retrieves a node by ID

func (*Registry) GetAll

func (r *Registry) GetAll() []*Node

GetAll returns all nodes

func (*Registry) GetCapabilities

func (r *Registry) GetCapabilities() []string

GetCapabilities returns all unique capabilities from all nodes

func (*Registry) GetOnline

func (r *Registry) GetOnline() []*Node

GetOnline returns all online nodes

func (*Registry) MarkOffline

func (r *Registry) MarkOffline(nodeID string, reason string)

MarkOffline marks a node as offline

func (*Registry) OnlineCount

func (r *Registry) OnlineCount() int

OnlineCount returns the number of online nodes

func (*Registry) Remove

func (r *Registry) Remove(nodeID string)

Remove removes a node from the registry

type StaticConfig

type StaticConfig struct {
	Cluster ClusterMeta  `toml:"cluster"`
	Node    NodeInfo     `toml:"node"`
	Peers   []PeerConfig `toml:"peers"`
}

StaticConfig represents the static cluster configuration (peers.toml) This file is created during onboard and contains the current node's information Users can manually edit this file to add known peers

func CreateStaticConfig

func CreateStaticConfig(nodeID, nodeName, address string) *StaticConfig

CreateStaticConfig creates a default static configuration for the current node

func DefaultConfig

func DefaultConfig(nodeID string) *StaticConfig

DefaultConfig creates a default cluster configuration (legacy alias)

func LoadConfig

func LoadConfig(configPath string) (*StaticConfig, error)

LoadConfig loads the cluster configuration (legacy alias)

func LoadOrCreateConfig

func LoadOrCreateConfig(configPath string, nodeID string) (*StaticConfig, error)

LoadOrCreateConfig loads existing config or creates a default one (legacy alias)

func LoadStaticConfig

func LoadStaticConfig(configPath string) (*StaticConfig, error)

LoadStaticConfig loads the static cluster configuration from peers.toml

type Task

type Task struct {
	ID          string                 `json:"id"`           // 唯一任务 ID
	Action      string                 `json:"action"`       // RPC action (peer_chat)
	PeerID      string                 `json:"peer_id"`      // 目标节点 ID
	Payload     map[string]interface{} `json:"payload"`      // 原始请求 payload
	Status      TaskStatus             `json:"status"`       // 当前状态
	CreatedAt   time.Time              `json:"created_at"`   // 创建时间
	CompletedAt *time.Time             `json:"completed_at"` // 完成时间

	// 结果
	Response string                 `json:"response,omitempty"` // LLM 回复内容
	Result   map[string]interface{} `json:"result,omitempty"`   // 回调结果
	Error    string                 `json:"error,omitempty"`    // 错误信息

	// Phase 2: 原始通道信息(用于续行通知路由)
	OriginalChannel string `json:"original_channel,omitempty"` // 发起方的通道(如 "web")
	OriginalChatID  string `json:"original_chat_id,omitempty"` // 发起方的会话 ID
}

Task 表示一个异步 RPC 任务

type TaskManager

type TaskManager struct {
	// contains filtered or unexported fields
}

TaskManager 任务生命周期管理

func NewTaskManager

func NewTaskManager(cleanupInterval time.Duration) *TaskManager

NewTaskManager 创建任务管理器

func (*TaskManager) CompleteCallback

func (tm *TaskManager) CompleteCallback(taskID, status, response, errMsg string) error

CompleteCallback 实现 handlers.TaskCompleter 接口 将基本类型的回调参数转换为 TaskResult 后调用 CompleteTask

func (*TaskManager) CompleteTask

func (tm *TaskManager) CompleteTask(taskID string, result *TaskResult) error

CompleteTask 标记任务完成(由 CallbackHandler 调用)

func (*TaskManager) GetTask

func (tm *TaskManager) GetTask(taskID string) (*Task, error)

GetTask 获取任务信息

func (*TaskManager) ListPendingTasks

func (tm *TaskManager) ListPendingTasks() ([]*Task, error)

ListPendingTasks 获取所有 pending 状态的任务(H4: recoveryLoop 使用)

func (*TaskManager) SetOnComplete

func (tm *TaskManager) SetOnComplete(fn func(taskID string))

SetOnComplete 设置任务完成回调(Phase 2)

func (*TaskManager) Start

func (tm *TaskManager) Start()

Start 启动 TaskManager 的清理协程

func (*TaskManager) Stop

func (tm *TaskManager) Stop()

Stop 停止 TaskManager

func (*TaskManager) Submit

func (tm *TaskManager) Submit(task *Task) error

Submit 提交任务

type TaskResult

type TaskResult struct {
	TaskID   string                 `json:"task_id"`
	Status   string                 `json:"status"` // success | error
	Response string                 `json:"response"`
	Result   map[string]interface{} `json:"result,omitempty"`
	Error    string                 `json:"error,omitempty"`
}

TaskResult 任务回调结果

type TaskResultEntry

type TaskResultEntry struct {
	TaskID       string    `json:"task_id"`
	Status       string    `json:"status"`        // "running" | "done"
	ResultStatus string    `json:"result_status"` // "success" | "error"(仅 done 时有值)
	Response     string    `json:"response,omitempty"`
	Error        string    `json:"error,omitempty"`
	SourceNode   string    `json:"source_node"`
	CreatedAt    time.Time `json:"created_at"`
	UpdatedAt    time.Time `json:"updated_at"`
}

TaskResultEntry B 端任务结果记录

type TaskResultIndex

type TaskResultIndex struct {
	Tasks map[string]*TaskResultEntry `json:"tasks"`
}

TaskResultIndex 磁盘索引

type TaskResultStore

type TaskResultStore struct {
	// contains filtered or unexported fields
}

TaskResultStore B 端任务结果持久化存储 running 状态仅存内存(进程重启后丢失,A 端会再次询问) done 状态写磁盘(数据文件 + 索引文件)

func NewTaskResultStore

func NewTaskResultStore(workspace string) (*TaskResultStore, error)

NewTaskResultStore 创建任务结果存储

func (*TaskResultStore) Delete

func (s *TaskResultStore) Delete(taskID string) error

Delete 删除任务结果(数据文件 + 索引 + 磁盘)

func (*TaskResultStore) Get

func (s *TaskResultStore) Get(taskID string) *TaskResultEntry

Get 查询任务结果(先查 running,再查索引)

func (*TaskResultStore) SetResult

func (s *TaskResultStore) SetResult(taskID, resultStatus, response, errMsg, sourceNode string) error

SetResult 写入完成结果(数据文件 + 索引 + 磁盘),同时清理 running 标记

func (*TaskResultStore) SetRunning

func (s *TaskResultStore) SetRunning(taskID, sourceNode string)

SetRunning 标记任务为 running(仅内存)

type TaskStatus

type TaskStatus string

TaskStatus 任务状态

const (
	TaskPending   TaskStatus = "pending"   // 已提交,等待 B 处理
	TaskCompleted TaskStatus = "completed" // B 已返回结果
	TaskFailed    TaskStatus = "failed"    // B 处理失败或回调失败
	TaskCancelled TaskStatus = "cancelled" // 主动取消(Phase 2)
)

type TaskStore

type TaskStore interface {
	Create(task *Task) error
	Get(taskID string) (*Task, error)
	UpdateResult(taskID string, result *TaskResult) error
	Delete(taskID string) error
	ListByStatus(status TaskStatus) ([]*Task, error)
}

TaskStore 任务存储接口(Phase 2 可替换为持久化实现)

Directories

Path Synopsis
Package handlers provides RPC handlers for the cluster module.
Package handlers provides RPC handlers for the cluster module.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL