Documentation
¶
Index ¶
- type AccessPolicy
- type AgentConnection
- type ConflictDetector
- type ConflictInfo
- type ConflictResolution
- type ConflictRule
- type ConflictType
- type ConnectionStatus
- type ConnectionType
- type ConsistencyLevel
- type DatabasePersistence
- func (dp *DatabasePersistence) Backup(ctx context.Context, path string) error
- func (dp *DatabasePersistence) Close() error
- func (dp *DatabasePersistence) DeleteState(ctx context.Context, stateID string) error
- func (dp *DatabasePersistence) ListStates(ctx context.Context, filters map[string]interface{}) ([]*SharedState, error)
- func (dp *DatabasePersistence) LoadState(ctx context.Context, stateID string) (*SharedState, error)
- func (dp *DatabasePersistence) Restore(ctx context.Context, path string) error
- func (dp *DatabasePersistence) SaveState(ctx context.Context, state *SharedState) error
- func (dp *DatabasePersistence) UpdateState(ctx context.Context, state *SharedState) error
- type DefaultConflictRule
- type Event
- type EventBus
- type EventHandler
- type EventType
- type FilePersistence
- func (fp *FilePersistence) Backup(ctx context.Context, path string) error
- func (fp *FilePersistence) Close() error
- func (fp *FilePersistence) DeleteState(ctx context.Context, stateID string) error
- func (fp *FilePersistence) ListStates(ctx context.Context, filters map[string]interface{}) ([]*SharedState, error)
- func (fp *FilePersistence) LoadState(ctx context.Context, stateID string) (*SharedState, error)
- func (fp *FilePersistence) Restore(ctx context.Context, path string) error
- func (fp *FilePersistence) SaveState(ctx context.Context, state *SharedState) error
- func (fp *FilePersistence) UpdateState(ctx context.Context, state *SharedState) error
- type InMemoryEventBus
- func (bus *InMemoryEventBus) Close() error
- func (bus *InMemoryEventBus) Publish(event StateEvent) error
- func (bus *InMemoryEventBus) PublishAsync(event StateEvent) error
- func (bus *InMemoryEventBus) Subscribe(eventType EventType, handler EventHandler) error
- func (bus *InMemoryEventBus) Unsubscribe(eventType EventType, handler EventHandler) error
- type Member
- type MemoryPersistence
- func (mp *MemoryPersistence) Backup(ctx context.Context, path string) error
- func (mp *MemoryPersistence) Close() error
- func (mp *MemoryPersistence) DeleteState(ctx context.Context, stateID string) error
- func (mp *MemoryPersistence) ListStates(ctx context.Context, filters map[string]interface{}) ([]*SharedState, error)
- func (mp *MemoryPersistence) LoadState(ctx context.Context, stateID string) (*SharedState, error)
- func (mp *MemoryPersistence) Restore(ctx context.Context, path string) error
- func (mp *MemoryPersistence) SaveState(ctx context.Context, state *SharedState) error
- func (mp *MemoryPersistence) UpdateState(ctx context.Context, state *SharedState) error
- type Message
- type OperationType
- type PendingOperation
- type PersistenceConfig
- type PersistenceLayer
- type PersistenceType
- type RedisPersistence
- func (rp *RedisPersistence) Backup(ctx context.Context, path string) error
- func (rp *RedisPersistence) Close() error
- func (rp *RedisPersistence) DeleteState(ctx context.Context, stateID string) error
- func (rp *RedisPersistence) ListStates(ctx context.Context, filters map[string]interface{}) ([]*SharedState, error)
- func (rp *RedisPersistence) LoadState(ctx context.Context, stateID string) (*SharedState, error)
- func (rp *RedisPersistence) Restore(ctx context.Context, path string) error
- func (rp *RedisPersistence) SaveState(ctx context.Context, state *SharedState) error
- func (rp *RedisPersistence) UpdateState(ctx context.Context, state *SharedState) error
- type Role
- type SharedState
- type Stars
- func (s *Stars) Broadcast(ctx context.Context, text string) error
- func (s *Stars) History() []Message
- func (s *Stars) ID() string
- func (s *Stars) Join(agentID string, role Role) error
- func (s *Stars) Leave(agentID string) error
- func (s *Stars) Members() []Member
- func (s *Stars) Name() string
- func (s *Stars) Run(ctx context.Context, task string) iter.Seq2[*Event, error]
- func (s *Stars) Send(ctx context.Context, from, to, text string) error
- func (s *Stars) Size() int
- type StateEvent
- type StateEventHandler
- type StateManager
- func (sm *StateManager) CreateState(ctx context.Context, stateID, name string, stateType StateType, ...) (*SharedState, error)
- func (sm *StateManager) DeleteState(ctx context.Context, stateID, agentID string) error
- func (sm *StateManager) GetMetrics() *StateMetrics
- func (sm *StateManager) GetState(stateID string) (*SharedState, error)
- func (sm *StateManager) UpdateState(ctx context.Context, stateID string, agentID string, ...) error
- type StateManagerConfig
- type StateMetrics
- type StateSyncManager
- func (ssm *StateSyncManager) ConnectAgent(agentID, name, address string, connType ConnectionType, states []string) (*AgentConnection, error)
- func (ssm *StateSyncManager) CreateSyncTask(taskType SyncTaskType, stateID string, targetAgents []string, ...) (*SyncTask, error)
- func (ssm *StateSyncManager) DisconnectAgent(agentID string) error
- func (ssm *StateSyncManager) GetAgentConnection(agentID string) (*AgentConnection, error)
- func (ssm *StateSyncManager) GetMetrics() *SyncMetrics
- func (ssm *StateSyncManager) ListAgentConnections() map[string]*AgentConnection
- type StateType
- type StateUpdate
- type SyncConfig
- type SyncMetrics
- type SyncStatus
- type SyncTask
- type SyncTaskType
- type TaskStatus
Constants ¶
This section is empty.
Variables ¶
This section is empty.
Functions ¶
This section is empty.
Types ¶
type AccessPolicy ¶
type AccessPolicy struct {
ReadAllow []string `json:"read_allow"` // 允许读取的Agent
WriteAllow []string `json:"write_allow"` // 允许写入的Agent
DeleteAllow []string `json:"delete_allow"` // 允许删除的Agent
AdminAllow []string `json:"admin_allow"` // 允许管理的Agent
Public bool `json:"public"` // 是否公开
}
AccessPolicy 访问策略
type AgentConnection ¶
type AgentConnection struct {
ID string `json:"id"`
Name string `json:"name"`
Address string `json:"address"`
SubscribedStates []string `json:"subscribed_states"`
ConnectionType ConnectionType `json:"connection_type"`
Status ConnectionStatus `json:"status"`
LastHeartbeat time.Time `json:"last_heartbeat"`
Metadata map[string]interface{} `json:"metadata"`
// 同步状态
SyncStatus map[string]SyncStatus `json:"sync_status"`
PendingUpdates []StateUpdate `json:"pending_updates"`
ConflictCount int64 `json:"conflict_count"`
// contains filtered or unexported fields
}
AgentConnection Agent连接
type ConflictDetector ¶
type ConflictDetector struct {
// contains filtered or unexported fields
}
ConflictDetector 冲突检测器
func NewConflictDetector ¶
func NewConflictDetector(eventBus EventBus) *ConflictDetector
NewConflictDetector 创建冲突检测器
func (*ConflictDetector) AddRule ¶
func (cd *ConflictDetector) AddRule(rule ConflictRule)
AddRule 添加冲突规则
func (*ConflictDetector) DetectConflict ¶
func (cd *ConflictDetector) DetectConflict(event StateEvent, currentState *SharedState) (*ConflictInfo, error)
DetectConflict 检测冲突
func (*ConflictDetector) ResolveConflict ¶
func (cd *ConflictDetector) ResolveConflict(conflict *ConflictInfo, strategy ConflictResolution) (*SharedState, error)
ResolveConflict 解决冲突
type ConflictInfo ¶
type ConflictInfo struct {
ConflictID string `json:"conflict_id"`
Type ConflictType `json:"type"`
Description string `json:"description"`
States map[string]*SharedState `json:"states"`
Operations []PendingOperation `json:"operations"`
Resolution ConflictResolution `json:"resolution"`
ResolvedAt time.Time `json:"resolved_at,omitempty"`
ResolvedBy string `json:"resolved_by,omitempty"`
Metadata map[string]interface{} `json:"metadata"`
}
ConflictInfo 冲突信息
type ConflictResolution ¶
type ConflictResolution string
ConflictResolution 冲突解决策略
const ( ConflictResolutionLastWrite ConflictResolution = "last_write" // 最后写入优先 ConflictResolutionMerge ConflictResolution = "merge" // 合并冲突 ConflictResolutionReject ConflictResolution = "reject" // 拒绝冲突 ConflictResolutionCustom ConflictResolution = "custom" // 自定义策略 )
type ConflictRule ¶
type ConflictRule interface {
DetectConflict(event StateEvent, currentState *SharedState) (*ConflictInfo, error)
ResolveConflict(conflict *ConflictInfo, strategy ConflictResolution) (*SharedState, error)
}
ConflictRule 冲突规则
type ConflictType ¶
type ConflictType string
ConflictType 冲突类型
const ( ConflictTypeWriteWrite ConflictType = "write_write" // 写-写冲突 ConflictTypeReadWrite ConflictType = "read_write" // 读-写冲突 ConflictTypeVersion ConflictType = "version" // 版本冲突 ConflictTypeSchema ConflictType = "schema" // 模式冲突 ConflictTypeCustom ConflictType = "custom" // 自定义冲突 )
type ConnectionStatus ¶
type ConnectionStatus string
ConnectionStatus 连接状态
const ( ConnectionStatusConnected ConnectionStatus = "connected" // 已连接 ConnectionStatusDisconnected ConnectionStatus = "disconnected" // 已断开 ConnectionStatusConnecting ConnectionStatus = "connecting" // 连接中 ConnectionStatusError ConnectionStatus = "error" // 错误 ConnectionStatusReconnecting ConnectionStatus = "reconnecting" // 重连中 )
type ConnectionType ¶
type ConnectionType string
ConnectionType 连接类型
const ( ConnectionTypePush ConnectionType = "push" // 推送模式 ConnectionTypePull ConnectionType = "pull" // 拉取模式 ConnectionTypeBidirectional ConnectionType = "bidirectional" // 双向模式 )
type ConsistencyLevel ¶
type ConsistencyLevel string
ConsistencyLevel 一致性级别
const ( ConsistencyLevelStrong ConsistencyLevel = "strong" // 强一致性 ConsistencyLevelWeak ConsistencyLevel = "weak" // 弱一致性 ConsistencyLevelEventual ConsistencyLevel = "eventual" // 最终一致性 )
type DatabasePersistence ¶
type DatabasePersistence struct {
// contains filtered or unexported fields
}
DatabasePersistence 数据库持久化实现
func NewDatabasePersistence ¶
func NewDatabasePersistence(config *PersistenceConfig) (*DatabasePersistence, error)
NewDatabasePersistence 创建数据库持久化
func (*DatabasePersistence) Backup ¶
func (dp *DatabasePersistence) Backup(ctx context.Context, path string) error
Backup 备份数据库到指定路径
func (*DatabasePersistence) DeleteState ¶
func (dp *DatabasePersistence) DeleteState(ctx context.Context, stateID string) error
DeleteState 从数据库删除状态
func (*DatabasePersistence) ListStates ¶
func (dp *DatabasePersistence) ListStates(ctx context.Context, filters map[string]interface{}) ([]*SharedState, error)
ListStates 列出数据库中的状态
func (*DatabasePersistence) LoadState ¶
func (dp *DatabasePersistence) LoadState(ctx context.Context, stateID string) (*SharedState, error)
LoadState 从数据库加载状态
func (*DatabasePersistence) Restore ¶
func (dp *DatabasePersistence) Restore(ctx context.Context, path string) error
Restore 从指定路径恢复数据库
func (*DatabasePersistence) SaveState ¶
func (dp *DatabasePersistence) SaveState(ctx context.Context, state *SharedState) error
SaveState 保存状态到数据库
func (*DatabasePersistence) UpdateState ¶
func (dp *DatabasePersistence) UpdateState(ctx context.Context, state *SharedState) error
UpdateState 更新数据库中的状态
type DefaultConflictRule ¶
type DefaultConflictRule struct{}
DefaultConflictRule 默认冲突规则
func (*DefaultConflictRule) DetectConflict ¶
func (dcr *DefaultConflictRule) DetectConflict(event StateEvent, currentState *SharedState) (*ConflictInfo, error)
DetectConflict 检测冲突
func (*DefaultConflictRule) ResolveConflict ¶
func (dcr *DefaultConflictRule) ResolveConflict(conflict *ConflictInfo, strategy ConflictResolution) (*SharedState, error)
ResolveConflict 解决冲突
type Event ¶
type Event struct {
AgentID string // 产生事件的 Agent ID
Type string // 事件类型
Content string // 事件内容
Time time.Time // 事件时间
}
Event 群星事件
type EventBus ¶
type EventBus interface {
Subscribe(eventType EventType, handler EventHandler) error
Unsubscribe(eventType EventType, handler EventHandler) error
Publish(event StateEvent) error
PublishAsync(event StateEvent) error
Close() error
}
EventBus 事件总线接口
type EventHandler ¶
type EventHandler interface {
Handle(ctx context.Context, event StateEvent) error
}
EventHandler 事件处理器
type EventType ¶
type EventType string
EventType 事件类型
const ( // 状态事件 EventTypeStateCreated EventType = "state_created" EventTypeStateUpdated EventType = "state_updated" EventTypeStateDeleted EventType = "state_deleted" EventTypeStateRead EventType = "state_read" // 同步事件 EventTypeSyncStarted EventType = "sync_started" EventTypeSyncCompleted EventType = "sync_completed" EventTypeSyncFailed EventType = "sync_failed" EventTypeSyncConflict EventType = "sync_conflict" // 冲突事件 EventTypeConflictDetected EventType = "conflict_detected" EventTypeConflictResolved EventType = "conflict_resolved" // Agent事件 EventTypeAgentConnected EventType = "agent_connected" EventTypeAgentDisconnected EventType = "agent_disconnected" EventTypeAgentHeartbeat EventType = "agent_heartbeat" // 系统事件 EventTypeSystemStarted EventType = "system_started" EventTypeSystemShutdown EventType = "system_shutdown" EventTypeConfigChanged EventType = "config_changed" )
type FilePersistence ¶
type FilePersistence struct {
// contains filtered or unexported fields
}
FilePersistence 文件持久化实现
func NewFilePersistence ¶
func NewFilePersistence(config *PersistenceConfig) (*FilePersistence, error)
NewFilePersistence 创建文件持久化
func (*FilePersistence) Backup ¶
func (fp *FilePersistence) Backup(ctx context.Context, path string) error
Backup 备份到指定路径
func (*FilePersistence) DeleteState ¶
func (fp *FilePersistence) DeleteState(ctx context.Context, stateID string) error
DeleteState 从文件删除状态
func (*FilePersistence) ListStates ¶
func (fp *FilePersistence) ListStates(ctx context.Context, filters map[string]interface{}) ([]*SharedState, error)
ListStates 列出文件中的状态
func (*FilePersistence) LoadState ¶
func (fp *FilePersistence) LoadState(ctx context.Context, stateID string) (*SharedState, error)
LoadState 从文件加载状态
func (*FilePersistence) Restore ¶
func (fp *FilePersistence) Restore(ctx context.Context, path string) error
Restore 从指定路径恢复
func (*FilePersistence) SaveState ¶
func (fp *FilePersistence) SaveState(ctx context.Context, state *SharedState) error
SaveState 保存状态到文件
func (*FilePersistence) UpdateState ¶
func (fp *FilePersistence) UpdateState(ctx context.Context, state *SharedState) error
UpdateState 更新文件中的状态
type InMemoryEventBus ¶
type InMemoryEventBus struct {
// contains filtered or unexported fields
}
InMemoryEventBus 内存事件总线
func NewInMemoryEventBus ¶
func NewInMemoryEventBus(bufferSize int) *InMemoryEventBus
NewInMemoryEventBus 创建内存事件总线
func (*InMemoryEventBus) Publish ¶
func (bus *InMemoryEventBus) Publish(event StateEvent) error
Publish 同步发布事件
func (*InMemoryEventBus) PublishAsync ¶
func (bus *InMemoryEventBus) PublishAsync(event StateEvent) error
PublishAsync 异步发布事件
func (*InMemoryEventBus) Subscribe ¶
func (bus *InMemoryEventBus) Subscribe(eventType EventType, handler EventHandler) error
Subscribe 订阅事件
func (*InMemoryEventBus) Unsubscribe ¶
func (bus *InMemoryEventBus) Unsubscribe(eventType EventType, handler EventHandler) error
Unsubscribe 取消订阅事件
type MemoryPersistence ¶
type MemoryPersistence struct {
// contains filtered or unexported fields
}
MemoryPersistence 内存持久化实现
func NewMemoryPersistence ¶
func NewMemoryPersistence() *MemoryPersistence
NewMemoryPersistence 创建内存持久化
func (*MemoryPersistence) Backup ¶
func (mp *MemoryPersistence) Backup(ctx context.Context, path string) error
Backup 备份
func (*MemoryPersistence) DeleteState ¶
func (mp *MemoryPersistence) DeleteState(ctx context.Context, stateID string) error
DeleteState 删除状态
func (*MemoryPersistence) ListStates ¶
func (mp *MemoryPersistence) ListStates(ctx context.Context, filters map[string]interface{}) ([]*SharedState, error)
ListStates 列出状态
func (*MemoryPersistence) LoadState ¶
func (mp *MemoryPersistence) LoadState(ctx context.Context, stateID string) (*SharedState, error)
LoadState 加载状态
func (*MemoryPersistence) Restore ¶
func (mp *MemoryPersistence) Restore(ctx context.Context, path string) error
Restore 恢复
func (*MemoryPersistence) SaveState ¶
func (mp *MemoryPersistence) SaveState(ctx context.Context, state *SharedState) error
SaveState 保存状态
func (*MemoryPersistence) UpdateState ¶
func (mp *MemoryPersistence) UpdateState(ctx context.Context, state *SharedState) error
UpdateState 更新状态
type Message ¶
type Message struct {
From string // 发送者 Agent ID
To string // 接收者 Agent ID(空表示广播)
Text string // 消息内容
Time time.Time // 发送时间
}
Message 群星消息
type OperationType ¶
type OperationType string
OperationType 操作类型
const ( OperationTypeSet OperationType = "set" // 设置值 OperationTypeDelete OperationType = "delete" // 删除值 OperationTypeMerge OperationType = "merge" // 合并值 OperationTypeCAS OperationType = "cas" // Compare-And-Set )
type PendingOperation ¶
type PendingOperation struct {
ID string `json:"id"`
Type OperationType `json:"type"`
Key string `json:"key"`
Value interface{} `json:"value"`
OldValue interface{} `json:"old_value"`
Timestamp time.Time `json:"timestamp"`
AgentID string `json:"agent_id"`
Retry int `json:"retry"`
Metadata map[string]interface{} `json:"metadata"`
}
PendingOperation 待处理操作
type PersistenceConfig ¶
type PersistenceConfig struct {
Type PersistenceType `json:"type"`
ConnectionString string `json:"connection_string"`
Timeout time.Duration `json:"timeout"`
MaxConnections int `json:"max_connections"`
BatchSize int `json:"batch_size"`
Compression bool `json:"compression"`
Encryption bool `json:"encryption"`
EncryptionKey string `json:"encryption_key,omitempty"`
Retention map[string]time.Duration `json:"retention"`
}
PersistenceConfig 持久化配置
type PersistenceLayer ¶
type PersistenceLayer interface {
SaveState(ctx context.Context, state *SharedState) error
LoadState(ctx context.Context, stateID string) (*SharedState, error)
UpdateState(ctx context.Context, state *SharedState) error
DeleteState(ctx context.Context, stateID string) error
ListStates(ctx context.Context, filters map[string]interface{}) ([]*SharedState, error)
Backup(ctx context.Context, path string) error
Restore(ctx context.Context, path string) error
Close() error
}
PersistenceLayer 持久化层接口
func NewPersistenceLayer ¶
func NewPersistenceLayer(config *PersistenceConfig) (PersistenceLayer, error)
NewPersistenceLayer 创建持久化层
type PersistenceType ¶
type PersistenceType string
PersistenceType 持久化类型
const ( PersistenceTypeMemory PersistenceType = "memory" // 内存持久化 PersistenceTypeFile PersistenceType = "file" // 文件持久化 PersistenceTypeDatabase PersistenceType = "database" // 数据库持久化 PersistenceTypeRedis PersistenceType = "redis" // Redis持久化 )
type RedisPersistence ¶
type RedisPersistence struct {
// contains filtered or unexported fields
}
RedisPersistence Redis持久化实现
func NewRedisPersistence ¶
func NewRedisPersistence(config *PersistenceConfig) (*RedisPersistence, error)
NewRedisPersistence 创建Redis持久化
func (*RedisPersistence) Backup ¶
func (rp *RedisPersistence) Backup(ctx context.Context, path string) error
Backup 备份Redis到指定路径
func (*RedisPersistence) DeleteState ¶
func (rp *RedisPersistence) DeleteState(ctx context.Context, stateID string) error
DeleteState 从Redis删除状态
func (*RedisPersistence) ListStates ¶
func (rp *RedisPersistence) ListStates(ctx context.Context, filters map[string]interface{}) ([]*SharedState, error)
ListStates 列出Redis中的状态
func (*RedisPersistence) LoadState ¶
func (rp *RedisPersistence) LoadState(ctx context.Context, stateID string) (*SharedState, error)
LoadState 从Redis加载状态
func (*RedisPersistence) Restore ¶
func (rp *RedisPersistence) Restore(ctx context.Context, path string) error
Restore 从指定路径恢复Redis
func (*RedisPersistence) SaveState ¶
func (rp *RedisPersistence) SaveState(ctx context.Context, state *SharedState) error
SaveState 保存状态到Redis
func (*RedisPersistence) UpdateState ¶
func (rp *RedisPersistence) UpdateState(ctx context.Context, state *SharedState) error
UpdateState 更新Redis中的状态
type SharedState ¶
type SharedState struct {
// 基本信息
// 状态数据
// 元数据
// 同步信息
AccessPolicy AccessPolicy `json:"access_policy"`
// contains filtered or unexported fields
}
SharedState 共享状态
type Stars ¶
type Stars struct {
// contains filtered or unexported fields
}
Stars 群星 - 多 Agent 协作单元 Stars 是 Aster 框架中的多 Agent 协作组件, 负责管理多个 Agent 之间的协作、通信和任务执行。
type StateEvent ¶
type StateEvent struct {
Type EventType `json:"type"`
StateID string `json:"state_id"`
State *SharedState `json:"state,omitempty"`
AgentID string `json:"agent_id,omitempty"`
Operation *PendingOperation `json:"operation,omitempty"`
Conflict *ConflictInfo `json:"conflict,omitempty"`
Timestamp time.Time `json:"timestamp"`
Metadata map[string]interface{} `json:"metadata"`
}
StateEvent 状态事件
type StateEventHandler ¶
type StateEventHandler struct {
// contains filtered or unexported fields
}
StateEventHandler 状态事件处理器
func (*StateEventHandler) Handle ¶
func (seh *StateEventHandler) Handle(ctx context.Context, event StateEvent) error
Handle 处理状态事件
type StateManager ¶
type StateManager struct {
// contains filtered or unexported fields
}
StateManager 状态管理器
func NewStateManager ¶
func NewStateManager(config *StateManagerConfig, eventBus EventBus, persistence PersistenceLayer) *StateManager
NewStateManager 创建状态管理器
func (*StateManager) CreateState ¶
func (sm *StateManager) CreateState(ctx context.Context, stateID, name string, stateType StateType, owners []string) (*SharedState, error)
CreateState 创建共享状态
func (*StateManager) DeleteState ¶
func (sm *StateManager) DeleteState(ctx context.Context, stateID, agentID string) error
DeleteState 删除共享状态
func (*StateManager) GetMetrics ¶
func (sm *StateManager) GetMetrics() *StateMetrics
GetMetrics 获取状态管理指标
func (*StateManager) GetState ¶
func (sm *StateManager) GetState(stateID string) (*SharedState, error)
GetState 获取共享状态
func (*StateManager) UpdateState ¶
func (sm *StateManager) UpdateState(ctx context.Context, stateID string, agentID string, updates map[string]interface{}, metadata map[string]interface{}) error
UpdateState 更新共享状态
type StateManagerConfig ¶
type StateManagerConfig struct {
// 同步配置
SyncInterval time.Duration `json:"sync_interval"`
MaxRetries int `json:"max_retries"`
RetryDelay time.Duration `json:"retry_delay"`
// 持久化配置
EnablePersistence bool `json:"enable_persistence"`
PersistenceType PersistenceType `json:"persistence_type"`
FlushInterval time.Duration `json:"flush_interval"`
// 一致性配置
ConsistencyLevel ConsistencyLevel `json:"consistency_level"`
ConflictResolution ConflictResolution `json:"conflict_resolution"`
// 监控配置
EnableMetrics bool `json:"enable_metrics"`
EnableAudit bool `json:"enable_audit"`
}
StateManagerConfig 状态管理器配置
type StateMetrics ¶
type StateMetrics struct {
TotalStates int64 `json:"total_states"`
ActiveStates int64 `json:"active_states"`
PendingOperations int64 `json:"pending_operations"`
SyncErrors int64 `json:"sync_errors"`
Conflicts int64 `json:"conflicts"`
ReadOps int64 `json:"read_ops"`
WriteOps int64 `json:"write_ops"`
DeleteOps int64 `json:"delete_ops"`
AverageSyncTime time.Duration `json:"average_sync_time"`
LastSyncTime time.Time `json:"last_sync_time"`
}
StateMetrics 状态指标
type StateSyncManager ¶
type StateSyncManager struct {
// contains filtered or unexported fields
}
StateSyncManager 状态同步管理器
func NewStateSyncManager ¶
func NewStateSyncManager(stateManager *StateManager, eventBus EventBus, config *SyncConfig) *StateSyncManager
NewStateSyncManager 创建状态同步管理器
func (*StateSyncManager) ConnectAgent ¶
func (ssm *StateSyncManager) ConnectAgent(agentID, name, address string, connType ConnectionType, states []string) (*AgentConnection, error)
ConnectAgent 连接Agent
func (*StateSyncManager) CreateSyncTask ¶
func (ssm *StateSyncManager) CreateSyncTask(taskType SyncTaskType, stateID string, targetAgents []string, updates []StateUpdate) (*SyncTask, error)
CreateSyncTask 创建同步任务
func (*StateSyncManager) DisconnectAgent ¶
func (ssm *StateSyncManager) DisconnectAgent(agentID string) error
DisconnectAgent 断开Agent连接
func (*StateSyncManager) GetAgentConnection ¶
func (ssm *StateSyncManager) GetAgentConnection(agentID string) (*AgentConnection, error)
GetAgentConnection 获取Agent连接
func (*StateSyncManager) GetMetrics ¶
func (ssm *StateSyncManager) GetMetrics() *SyncMetrics
GetMetrics 获取同步指标
func (*StateSyncManager) ListAgentConnections ¶
func (ssm *StateSyncManager) ListAgentConnections() map[string]*AgentConnection
ListAgentConnections 列出所有Agent连接
type StateUpdate ¶
type StateUpdate struct {
ID string `json:"id"`
StateID string `json:"state_id"`
AgentID string `json:"agent_id"`
Type OperationType `json:"type"`
Key string `json:"key"`
OldValue interface{} `json:"old_value"`
NewValue interface{} `json:"new_value"`
Version int64 `json:"version"`
Timestamp time.Time `json:"timestamp"`
Metadata map[string]interface{} `json:"metadata"`
}
StateUpdate 状态更新
type SyncConfig ¶
type SyncConfig struct {
// 基础配置
SyncInterval time.Duration `json:"sync_interval"`
MaxRetries int `json:"max_retries"`
RetryDelay time.Duration `json:"retry_delay"`
Timeout time.Duration `json:"timeout"`
// 批处理配置
BatchSize int `json:"batch_size"`
FlushInterval time.Duration `json:"flush_interval"`
// 一致性配置
ConsistencyLevel ConsistencyLevel `json:"consistency_level"`
ConflictStrategy ConflictResolution `json:"conflict_strategy"`
// 压缩配置
Compression bool `json:"compression"`
CompressionType string `json:"compression_type"`
// 安全配置
Encryption bool `json:"encryption"`
EncryptionKey string `json:"encryption_key"`
// 性能配置
MaxConcurrentSyncs int `json:"max_concurrent_syncs"`
QueueBufferSize int `json:"queue_buffer_size"`
}
SyncConfig 同步配置
type SyncMetrics ¶
type SyncMetrics struct {
TotalSyncs int64 `json:"total_syncs"`
SuccessfulSyncs int64 `json:"successful_syncs"`
FailedSyncs int64 `json:"failed_syncs"`
Conflicts int64 `json:"conflicts"`
PendingTasks int64 `json:"pending_tasks"`
ActiveConnections int64 `json:"active_connections"`
AverageLatency time.Duration `json:"average_latency"`
LastSyncTime time.Time `json:"last_sync_time"`
ThroughputPerSec float64 `json:"throughput_per_sec"`
}
SyncMetrics 同步指标
type SyncStatus ¶
type SyncStatus string
SyncStatus 同步状态
const ( SyncStatusSynced SyncStatus = "synced" // 已同步 SyncStatusPending SyncStatus = "pending" // 待同步 SyncStatusConflict SyncStatus = "conflict" // 冲突 SyncStatusError SyncStatus = "error" // 错误 SyncStatusSyncing SyncStatus = "syncing" // 同步中 )
type SyncTask ¶
type SyncTask struct {
ID string `json:"id"`
Type SyncTaskType `json:"type"`
Priority int `json:"priority"`
StateID string `json:"state_id"`
TargetAgents []string `json:"target_agents"`
Updates []StateUpdate `json:"updates"`
CreatedAt time.Time `json:"created_at"`
StartedAt time.Time `json:"started_at,omitempty"`
CompletedAt time.Time `json:"completed_at,omitempty"`
Status TaskStatus `json:"status"`
RetryCount int `json:"retry_count"`
MaxRetries int `json:"max_retries"`
Error string `json:"error,omitempty"`
Metadata map[string]interface{} `json:"metadata"`
}
SyncTask 同步任务
type SyncTaskType ¶
type SyncTaskType string
SyncTaskType 同步任务类型
const ( SyncTaskTypeBroadcast SyncTaskType = "broadcast" // 广播同步 SyncTaskTypeUnicast SyncTaskType = "unicast" // 单播同步 SyncTaskTypeMulticast SyncTaskType = "multicast" // 多播同步 SyncTaskTypeFullSync SyncTaskType = "full_sync" // 全量同步 SyncTaskTypeDeltaSync SyncTaskType = "delta_sync" // 增量同步 )
type TaskStatus ¶
type TaskStatus string
TaskStatus 任务状态
const ( TaskStatusPending TaskStatus = "pending" // 待执行 TaskStatusRunning TaskStatus = "running" // 执行中 TaskStatusCompleted TaskStatus = "completed" // 已完成 TaskStatusFailed TaskStatus = "failed" // 失败 TaskStatusCancelled TaskStatus = "cancelled" // 已取消 )