Documentation
¶
Index ¶
- Constants
- Variables
- func ConfigureProfiling(config ProfilingConfig)
- func DisableMetrics()
- func EnableMetrics()
- func ExtractRequestMessage(msg interface{}) (interface{}, string, bool)
- func GenerateID() string
- func GenerateUniqueName() string
- func GetGoroutineCount() int
- func GetMemStats() runtime.MemStats
- func GetProfilingEndpoints() []string
- func GetProfilingURL(port string) string
- func GetRuntimeStats() map[string]interface{}
- func IsMetricsEnabled() bool
- func IsProfilingEnabled() bool
- func IsValidPath(path string) bool
- func PrintMetrics()
- func PrintProfilingInstructions(port string)
- func ProfilingInfo() (enabled bool, port string, started bool)
- func RecordActorCreation()
- func RecordActorTermination()
- func RecordLatency(latency int64)
- func RecordMessageReceived()
- func RecordMessageSent()
- func RecordRegistryLookup()
- func ResetMetrics()
- func SendResponse(context ActorContext, msg interface{}, result interface{}, err error) bool
- func SetFutureLogLevel(level LogLevel)
- func SetFutureLogger(logger FutureLogger)
- func ShutdownDefaultWorkerPool()
- func StartCPUProfiling(filename string) func() error
- func StartCPUProfilingWithDuration(filename string, duration time.Duration) error
- func StopProfiling()
- func ValidatePath(path string) error
- type Actor
- type ActorCell
- func (c *ActorCell) AddChild(ref ActorRef)
- func (c *ActorCell) CompareAndSwapState(oldState, newState LifecycleState) bool
- func (c *ActorCell) FutureManager() FutureManager
- func (c *ActorCell) GetChildren() []ActorRef
- func (c *ActorCell) GetState() LifecycleState
- func (c *ActorCell) IsAlive() bool
- func (c *ActorCell) Path() string
- func (c *ActorCell) RemoveChild(ref ActorRef)
- func (c *ActorCell) SetFutureManager(manager FutureManager)
- func (c *ActorCell) SetState(state LifecycleState)
- type ActorContext
- type ActorRef
- type ActorSystem
- type AllForOneStrategy
- type Config
- type ConfigOption
- type CustomFutureLogger
- type DeadLetter
- type DeadLettersActor
- type Decider
- type DefaultActor
- type DefaultFutureManager
- func (m *DefaultFutureManager) Cancel(correlationID string) bool
- func (m *DefaultFutureManager) Cleanup()
- func (m *DefaultFutureManager) Complete(responseMsg ResponseMessage) bool
- func (m *DefaultFutureManager) Count() int
- func (m *DefaultFutureManager) Create(target ActorRef, msg interface{}, timeout time.Duration) (Future, error)
- type Directive
- type Dispatcher
- type Future
- type FutureCallback
- type FutureImpl
- func (f *FutureImpl) Await(timeout time.Duration) (interface{}, error)
- func (f *FutureImpl) Cancel()
- func (f *FutureImpl) CorrelationID() string
- func (f *FutureImpl) IsReady() bool
- func (f *FutureImpl) OnComplete(callback func(result interface{}, err error))
- func (f *FutureImpl) Result(timeout time.Duration) (interface{}, error)
- func (f *FutureImpl) State() FutureState
- type FutureLogger
- type FutureManager
- type FutureState
- type GuardianActor
- type IDGenerator
- type JSONFutureLogger
- type LifecycleActor
- type LifecycleManager
- type LifecycleState
- type LogLevel
- type Mailbox
- type MessageInvoker
- type Metrics
- type MetricsSnapshot
- type OneForOneStrategy
- type ProfilingConfig
- type Props
- type PropsFunc
- type Registry
- type ResponseMessage
- type Supervisor
- type SupervisorConfig
- type SupervisorStrategy
- type WorkerPool
Constants ¶
const (
// NumRegistryBuckets 分片注册表的桶数量
NumRegistryBuckets = 256
)
Variables ¶
var ( // ErrActorNotFound actor未找到错误 ErrActorNotFound = errors.New("actor not found") // ErrActorStopped actor已停止错误 ErrActorStopped = errors.New("actor stopped") // DeadLetters 死信actor引用 DeadLetters ActorRef // NoSender 无发送者引用 NoSender ActorRef )
var ( // ErrFutureTimeout 表示Future等待超时 ErrFutureTimeout = errors.New("future: timeout waiting for result") // ErrFutureCancelled 表示Future被取消 ErrFutureCancelled = errors.New("future: operation cancelled") )
var ( // ErrSystemNameTaken 系统名称已被占用 ErrSystemNameTaken = errors.New("system name already taken") // ErrSystemNotFound 系统未找到 ErrSystemNotFound = errors.New("system not found") // ErrPathExists 路径已存在 ErrPathExists = errors.New("path already exists") // ErrParentNotExists 父路径不存在 ErrParentNotExists = errors.New("parent path not exists") // ErrSystemShutdown 系统已关闭 ErrSystemShutdown = errors.New("system is shutdown") )
Functions ¶
func ConfigureProfiling ¶
func ConfigureProfiling(config ProfilingConfig)
ConfigureProfiling configures profiling based on environment variables or provided config
func ExtractRequestMessage ¶
ExtractRequestMessage 从Future请求消息中提取原始消息和correlation ID 返回 (原始消息, correlation ID, 是否是Future请求)
func GetGoroutineCount ¶
func GetGoroutineCount() int
GetGoroutineCount returns the current goroutine count
func GetProfilingEndpoints ¶
func GetProfilingEndpoints() []string
GetProfilingEndpoints returns available profiling endpoints
func GetProfilingURL ¶
GetProfilingURL returns the URL for accessing profiling data
func GetRuntimeStats ¶
func GetRuntimeStats() map[string]interface{}
GetRuntimeStats returns current runtime statistics
func IsMetricsEnabled ¶
func IsMetricsEnabled() bool
IsMetricsEnabled returns true if metrics collection is enabled
func IsProfilingEnabled ¶
func IsProfilingEnabled() bool
IsProfilingEnabled returns true if profiling is enabled
func PrintProfilingInstructions ¶
func PrintProfilingInstructions(port string)
PrintProfilingInstructions prints instructions for using profiling
func ProfilingInfo ¶
ProfilingInfo returns information about the profiling configuration
func RecordActorTermination ¶
func RecordActorTermination()
RecordActorTermination records an actor termination
func RecordLatency ¶
func RecordLatency(latency int64)
RecordLatency records a latency value in nanoseconds
func RecordMessageReceived ¶
func RecordMessageReceived()
RecordMessageReceived records a received message
func RecordRegistryLookup ¶
func RecordRegistryLookup()
RecordRegistryLookup records a registry lookup
func SendResponse ¶
func SendResponse(context ActorContext, msg interface{}, result interface{}, err error) bool
SendResponse 发送Future响应给sender 供接收Future请求的Actor使用 context是调用者的ActorContext msg是接收到的原始消息(包含correlation ID) result是响应结果 err是响应错误(可选)
func ShutdownDefaultWorkerPool ¶
func ShutdownDefaultWorkerPool()
ShutdownDefaultWorkerPool 关闭默认worker池
func StartCPUProfiling ¶
StartCPUProfiling starts CPU profiling (manual trigger) Returns a stop function that should be called to stop profiling and write the profile
func StartCPUProfilingWithDuration ¶
StartCPUProfilingWithDuration starts CPU profiling for a specified duration Automatically stops profiling after the duration and writes to the specified file
Types ¶
type Actor ¶
type Actor interface {
// Receive 处理接收到的消息,通过ctx获取上下文信息
Receive(ctx ActorContext)
}
Actor 定义Actor接口,处理接收到的消息
type ActorCell ¶
type ActorCell struct {
Props *Props
Mailbox Mailbox
Dispatcher Dispatcher
Ref ActorRef
Parent ActorRef
Children map[string]ActorRef // 使用map实现O(1)查找和删除
ChildrenMutex sync.RWMutex // 保护Children的读写锁
State atomic.Int32 // LifecycleState
// contains filtered or unexported fields
}
ActorCell 封装actor的元数据和状态
func NewActorCell ¶
NewActorCell 创建新的actor cell
func (*ActorCell) CompareAndSwapState ¶
func (c *ActorCell) CompareAndSwapState(oldState, newState LifecycleState) bool
CompareAndSwapState 比较并交换状态
func (*ActorCell) FutureManager ¶
func (c *ActorCell) FutureManager() FutureManager
FutureManager 获取FutureManager
func (*ActorCell) SetFutureManager ¶
func (c *ActorCell) SetFutureManager(manager FutureManager)
SetFutureManager 设置FutureManager
type ActorContext ¶
type ActorContext interface {
// Self 获取actor自己的引用
Self() ActorRef
// Sender 获取消息发送者引用
Sender() ActorRef
// Message 获取当前正在处理的消息
Message() interface{}
// Tell 发送消息给其他actor
Tell(ref ActorRef, msg interface{}) error
// Ask 发送消息并等待响应(Future模式)
// 返回一个Future,可以等待结果或注册回调
Ask(target ActorRef, msg interface{}, timeout time.Duration) (Future, error)
// Stop 停止当前actor
Stop() error
// Spawn 创建子actor
Spawn(props *Props) (ActorRef, error)
// Path 获取actor路径
Path() string
// Parent 获取父actor引用
Parent() ActorRef
}
ActorContext 提供actor的元信息和操作接口
type ActorRef ¶
type ActorRef interface {
// Tell 异步发送消息给actor
Tell(msg interface{}) error
// Path 返回actor的路径
Path() string
// IsAlive 检查actor是否存活
IsAlive() bool
// Equals 比较两个引用是否指向同一个actor
Equals(other ActorRef) bool
}
ActorRef 代表对actor的引用
func NewActorRef ¶
func NewActorRef(path string, system ActorSystem, registry Registry) ActorRef
NewActorRef 创建actor引用
type ActorSystem ¶
type ActorSystem interface {
// Name 返回系统名称
Name() string
// Spawn 创建顶级actor
Spawn(props *Props) (ActorRef, error)
// SpawnWithName 创建指定名称的顶级actor
SpawnWithName(props *Props, name string) (ActorRef, error)
// Stop 停止指定actor
Stop(ref ActorRef) error
// Suspend 暂停actor
Suspend(ref ActorRef) error
// Resume 恢复actor
Resume(ref ActorRef) error
// Restart 重启actor
Restart(ref ActorRef) error
// Lookup 按路径查找actor
Lookup(path string) (ActorRef, bool)
// Shutdown 优雅关闭整个系统
Shutdown(timeout *int) error
// ShutdownNow 立即关闭系统
ShutdownNow()
// DeadLetters 返回死信actor引用
DeadLetters() ActorRef
// Root 获取根guardian引用
Root() ActorRef
// User 获取user guardian引用
User() ActorRef
// System 获取system guardian引用
System() ActorRef
// SystemActorOf 创建系统级actor
SystemActorOf(props *Props, name string) (ActorRef, error)
// LookupDispatcher 按名称查找dispatcher
LookupDispatcher(name string) (Dispatcher, bool)
// RegisterDispatcher 注册dispatcher
RegisterDispatcher(name string, dispatcher Dispatcher)
}
ActorSystem actor系统接口
func NewActorSystem ¶
func NewActorSystem(name string, opts ...ConfigOption) (ActorSystem, error)
NewActorSystem 创建新的actor系统
func NewActorSystemFromConfig ¶
func NewActorSystemFromConfig(config *Config) (ActorSystem, error)
NewActorSystemFromConfig 从配置创建actor系统
type AllForOneStrategy ¶
type AllForOneStrategy struct {
// contains filtered or unexported fields
}
AllForOneStrategy AllForOne监督策略
func (*AllForOneStrategy) Decider ¶
func (s *AllForOneStrategy) Decider(child ActorRef, reason interface{}) Directive
AllForOneStrategy 实现 SupervisorStrategy 接口
func (*AllForOneStrategy) HandleFailure ¶
func (s *AllForOneStrategy) HandleFailure(child ActorRef, reason interface{})
func (*AllForOneStrategy) SetSystem ¶
func (s *AllForOneStrategy) SetSystem(system ActorSystem)
SetSystem 设置ActorSystem引用
type Config ¶
type Config struct {
Name string
DefaultDispatcher Dispatcher
DefaultMailboxSize int
DefaultSupervisor SupervisorStrategy
RootGuardian string
UserGuardian string
SystemGuardian string
}
Config 系统配置
type ConfigOption ¶
type ConfigOption func(*Config)
ConfigOption 配置选项函数
func WithDefaultDispatcher ¶
func WithDefaultDispatcher(d Dispatcher) ConfigOption
WithDefaultDispatcher 设置默认dispatcher
func WithDefaultMailboxSize ¶
func WithDefaultMailboxSize(size int) ConfigOption
WithDefaultMailboxSize 设置默认mailbox大小
func WithDefaultPinnedDispatcher ¶
func WithDefaultPinnedDispatcher() ConfigOption
WithDefaultPinnedDispatcher 设置默认的pinning dispatcher
func WithDefaultSupervisor ¶
func WithDefaultSupervisor(s SupervisorStrategy) ConfigOption
WithDefaultSupervisor 设置默认监督策略
type CustomFutureLogger ¶
type CustomFutureLogger struct {
DebugFunc func(format string, args ...interface{})
InfoFunc func(format string, args ...interface{})
WarnFunc func(format string, args ...interface{})
ErrorFunc func(format string, args ...interface{})
}
CustomFutureLogger 允许用户自定义日志实现
func (*CustomFutureLogger) Debug ¶
func (l *CustomFutureLogger) Debug(format string, args ...interface{})
Debug 实现
func (*CustomFutureLogger) Error ¶
func (l *CustomFutureLogger) Error(format string, args ...interface{})
Error 实现
func (*CustomFutureLogger) Info ¶
func (l *CustomFutureLogger) Info(format string, args ...interface{})
Info 实现
func (*CustomFutureLogger) Warn ¶
func (l *CustomFutureLogger) Warn(format string, args ...interface{})
Warn 实现
type DeadLetter ¶
type DeadLetter struct {
Message interface{}
Sender ActorRef
Recipient ActorRef
Timestamp string // RFC3339格式时间戳
}
DeadLetter 死信结构
type DeadLettersActor ¶
type DeadLettersActor struct {
*DefaultActor
// contains filtered or unexported fields
}
DeadLettersActor 死信actor实现
func (*DeadLettersActor) GetDeadLetters ¶
func (a *DeadLettersActor) GetDeadLetters() []DeadLetter
GetDeadLetters 返回保存的死信列表
func (*DeadLettersActor) GetDropCount ¶
func (a *DeadLettersActor) GetDropCount() int64
GetDropCount 返回丢弃的消息总数
func (*DeadLettersActor) Receive ¶
func (a *DeadLettersActor) Receive(ctx ActorContext)
type DefaultFutureManager ¶
type DefaultFutureManager struct {
// contains filtered or unexported fields
}
DefaultFutureManager 是FutureManager的默认实现
func (*DefaultFutureManager) Cancel ¶
func (m *DefaultFutureManager) Cancel(correlationID string) bool
Cancel 取消一个Future
func (*DefaultFutureManager) Cleanup ¶
func (m *DefaultFutureManager) Cleanup()
Cleanup 清理已完成或超时的Future
func (*DefaultFutureManager) Complete ¶
func (m *DefaultFutureManager) Complete(responseMsg ResponseMessage) bool
Complete 完成一个Future,将响应传递给等待者
func (*DefaultFutureManager) Count ¶
func (m *DefaultFutureManager) Count() int
Count 返回当前活跃的Future数量
type Directive ¶
type Directive int
Directive 定义监督决策指令
func EscalatingDecider ¶
func EscalatingDecider(reason interface{}) Directive
EscalatingDecider 总是向上传播故障
func StoppingDecider ¶
func StoppingDecider(reason interface{}) Directive
StoppingDecider 总是停止故障actor
type Dispatcher ¶
type Dispatcher interface {
// Schedule 调度任务执行
Schedule(fn func())
// Throughput 返回每次处理的消息数量
Throughput() int
// Name 返回dispatcher名称
Name() string
}
Dispatcher 调度器接口,用于调度actor的消息处理
func NewCallingThreadDispatcher ¶
func NewCallingThreadDispatcher() Dispatcher
NewCallingThreadDispatcher 创建调用线程dispatcher
func NewDefaultDispatcher ¶
func NewDefaultDispatcher() Dispatcher
NewDefaultDispatcher 创建默认dispatcher
type Future ¶
type Future interface {
// CorrelationID 返回Future的关联ID
CorrelationID() string
// Result 等待并返回结果,支持超时控制
// 如果超时则返回 ErrFutureTimeout
// 如果被取消则返回 ErrFutureCancelled
// 如果操作失败则返回相应的错误
Result(timeout time.Duration) (interface{}, error)
// Await 是Result的别名,提供更简洁的API
Await(timeout time.Duration) (interface{}, error)
// OnComplete 注册一个回调函数,当Future完成时被调用
// 如果Future已经完成,回调将立即被调用
OnComplete(callback func(result interface{}, err error))
// Cancel 取消Future操作
Cancel()
// IsReady 检查Future是否已完成
IsReady() bool
// State 返回Future当前的状态
State() FutureState
}
Future 表示一个异步操作的结果
type FutureCallback ¶
type FutureCallback func(result interface{}, err error)
FutureCallback 表示Future完成时的回调函数
type FutureImpl ¶
type FutureImpl struct {
// contains filtered or unexported fields
}
FutureImpl 是Future接口的实现
func (*FutureImpl) Await ¶
func (f *FutureImpl) Await(timeout time.Duration) (interface{}, error)
Await 是Result的别名,提供更简洁的API
func (*FutureImpl) CorrelationID ¶
func (f *FutureImpl) CorrelationID() string
CorrelationID 返回Future的关联ID
func (*FutureImpl) OnComplete ¶
func (f *FutureImpl) OnComplete(callback func(result interface{}, err error))
OnComplete 注册一个回调函数,当Future完成时被调用
type FutureLogger ¶
type FutureLogger interface {
// Debug 记录调试日志
Debug(format string, args ...interface{})
// Info 记录信息日志
Info(format string, args ...interface{})
// Warn 记录警告日志
Warn(format string, args ...interface{})
// Error 记录错误日志
Error(format string, args ...interface{})
}
FutureLogger 定义Future的日志接口
func NewCustomLogger ¶
func NewCustomLogger( debugFunc func(format string, args ...interface{}), infoFunc func(format string, args ...interface{}), warnFunc func(format string, args ...interface{}), errorFunc func(format string, args ...interface{}), ) FutureLogger
NewCustomLogger 创建自定义日志器 参数都是可选的,nil表示该级别的日志被禁用
func NewJSONFutureLogger ¶
func NewJSONFutureLogger(output *os.File, level LogLevel) FutureLogger
NewJSONFutureLogger 创建JSON日志器
type FutureManager ¶
type FutureManager interface {
// Create 创建一个新的Future并发送消息
// 为消息包装correlation ID并返回Future实例
Create(target ActorRef, msg interface{}, timeout time.Duration) (Future, error)
// Complete 完成一个Future,将响应传递给等待者
Complete(responseMsg ResponseMessage) bool
// Cancel 取消一个Future
Cancel(correlationID string) bool
// Cleanup 清理已完成或超时的Future
Cleanup()
// Count 返回当前活跃的Future数量
Count() int
}
FutureManager 管理Actor的所有Future请求
type FutureState ¶
type FutureState int32
FutureState 表示Future的状态
const ( // FutureStatePending 表示Future等待中 FutureStatePending FutureState = iota // FutureStateCompleted 表示Future已完成(成功或失败) FutureStateCompleted // FutureStateTimeout表示Future超时 FutureStateTimeout // FutureStateCancelled 表示Future已取消 FutureStateCancelled )
type GuardianActor ¶
type GuardianActor struct {
*DefaultActor
// contains filtered or unexported fields
}
GuardianActor guardian actor实现
func (*GuardianActor) Receive ¶
func (g *GuardianActor) Receive(ctx ActorContext)
type IDGenerator ¶
type IDGenerator struct {
// contains filtered or unexported fields
}
IDGenerator 轻量级ID生成器,用于替代uuid.New()
func GetGlobalIDGenerator ¶
func GetGlobalIDGenerator() *IDGenerator
GetGlobalIDGenerator 获取全局ID生成器(单例)
func (*IDGenerator) Generate ¶
func (g *IDGenerator) Generate() string
Generate 生成唯一的ID字符串 格式: {nodeID}-{counter}
type JSONFutureLogger ¶
type JSONFutureLogger struct {
// contains filtered or unexported fields
}
JSONFutureLogger 输出JSON格式的日志,适合日志聚合系统
func (*JSONFutureLogger) Debug ¶
func (l *JSONFutureLogger) Debug(format string, args ...interface{})
Debug 实现
func (*JSONFutureLogger) Error ¶
func (l *JSONFutureLogger) Error(format string, args ...interface{})
Error 实现
func (*JSONFutureLogger) Info ¶
func (l *JSONFutureLogger) Info(format string, args ...interface{})
Info 实现
func (*JSONFutureLogger) Warn ¶
func (l *JSONFutureLogger) Warn(format string, args ...interface{})
Warn 实现
type LifecycleActor ¶
type LifecycleActor interface {
Actor
// PreStart 在actor启动前调用
PreStart(ctx ActorContext) error
// PostStop 在actor停止后调用
PostStop(ctx ActorContext) error
// PreRestart 在actor重启前调用(子actor停止后)
PreRestart(ctx ActorContext, reason interface{})
// PostRestart 在actor重启后调用(PreStart之前)
PostRestart(ctx ActorContext, reason interface{})
}
LifecycleActor 定义生命周期挂钩接口
type LifecycleManager ¶
type LifecycleManager struct {
// contains filtered or unexported fields
}
LifecycleManager 管理actor生命周期
func NewLifecycleManager ¶
func NewLifecycleManager(cell *ActorCell) *LifecycleManager
NewLifecycleManager 创建生命周期管理器
func (*LifecycleManager) CanTransition ¶
func (lm *LifecycleManager) CanTransition(from, to LifecycleState) bool
CanTransition 检查是否可以转换状态
type LifecycleState ¶
type LifecycleState int32
LifecycleState 定义actor生命周期状态
const ( Created LifecycleState = iota // 已创建但未启动 StartInitiated // 启动中 Started // 已启动,正常运行 Suspended // 已暂停,不处理消息 Stopping // 停止中 Stopped // 已停止 Restarter // 重启中 )
func (LifecycleState) String ¶
func (s LifecycleState) String() string
type Mailbox ¶
type Mailbox interface {
// PostUserMessage 投递用户消息
PostUserMessage(msg interface{})
// PostSystemMessage 投递系统消息
PostSystemMessage(msg interface{})
// UserMessageCount 获取用户消息数量
UserMessageCount() int
}
Mailbox 邮箱接口,用于消息处理和故障上报
func NewMailbox ¶
func NewMailbox(cell *ActorCell, dispatcher Dispatcher) Mailbox
NewMailbox 创建新的mailbox
type MessageInvoker ¶
type MessageInvoker interface {
// Invoke 调用actor处理消息
Invoke(receiver Actor, msg interface{})
// EscalateFailure 将故障上报给监督者
EscalateFailure(reason string, msg interface{})
}
MessageInvoker 邮箱接口,用于消息处理和故障上报
type Metrics ¶
type Metrics struct {
MessagesSent atomic.Uint64
MessagesReceived atomic.Uint64
ActorCreations atomic.Uint64
ActorTerminations atomic.Uint64
RegistryLookups atomic.Uint64
LockAcquisitions atomic.Uint64
// Latency measurements (in nanoseconds)
TotalLatency atomic.Uint64
LatencyCount atomic.Uint64
// Start time for rate calculations
StartTime atomic.Int64
}
Metrics represents performance metric collection
type MetricsSnapshot ¶
type MetricsSnapshot struct {
MessagesPerSec float64
MessagesReceived uint64
MessagesSent uint64
ActiveActors uint64
ActorCreations uint64
RegistryLookupsSec float64
AvgLatency float64
Uptime time.Duration
}
MetricsSnapshot represents a snapshot of current metrics
func GetMetricsSnapshot ¶
func GetMetricsSnapshot() MetricsSnapshot
GetMetricsSnapshot returns a snapshot of current metrics with calculated rates
type OneForOneStrategy ¶
type OneForOneStrategy struct {
// contains filtered or unexported fields
}
OneForOneStrategy OneForOne监督策略
func (*OneForOneStrategy) Decider ¶
func (s *OneForOneStrategy) Decider(child ActorRef, reason interface{}) Directive
OneForOneStrategy 实现 SupervisorStrategy 接口
func (*OneForOneStrategy) HandleFailure ¶
func (s *OneForOneStrategy) HandleFailure(child ActorRef, reason interface{})
func (*OneForOneStrategy) SetSystem ¶
func (s *OneForOneStrategy) SetSystem(system ActorSystem)
SetSystem 设置ActorSystem引用
type ProfilingConfig ¶
ProfilingConfig holds profiling configuration
func DefaultProfilingConfig ¶
func DefaultProfilingConfig() ProfilingConfig
DefaultProfilingConfig returns default profiling configuration
type Props ¶
type Props struct {
Actor func() Actor // actor工厂函数
Name string // actor名称
Mailbox Mailbox // 自定义mailbox(可选)
Dispatcher Dispatcher // 自定义dispatcher(可选)
Supervisor SupervisorConfig // 监督配置
}
Props 用于配置actor的创建参数
func (*Props) WithDispatcher ¶
func (p *Props) WithDispatcher(dispatcher Dispatcher) *Props
WithDispatcher 设置dispatcher
func (*Props) WithMailbox ¶
WithMailbox 设置mailbox
func (*Props) WithSupervisor ¶
func (p *Props) WithSupervisor(strategy SupervisorStrategy) *Props
WithSupervisor 设置监督策略
type Registry ¶
type Registry interface {
// Register 注册actor到指定路径
Register(path string, cell *ActorCell) error
// Unregister 注销actor
Unregister(path string)
// Lookup 查找路径对应的actor
Lookup(path string) (*ActorCell, bool)
// Resolve 解析路径字符串为actor引用
Resolve(path string) (ActorRef, bool)
// All 返回所有注册的actor路径
All() []string
// Children 返回父路径下所有子actor路径
Children(parent string) []string
// Count 返回注册的actor总数
Count() int
}
Registry 接口用于管理actor注册
type ResponseMessage ¶
type ResponseMessage struct {
CorrelationID string // 关联ID,用于匹配Future请求
Result interface{} // 响应结果
Error error // 响应错误
}
ResponseMessage 表示Future的响应消息
func CreateResponse ¶
func CreateResponse(correlationID string, result interface{}, err error) ResponseMessage
CreateResponse 创建Future响应消息
type Supervisor ¶
type Supervisor interface {
// HandleChildFailure 处理子actor故障
HandleChildFailure(child *ActorCell, reason interface{})
}
Supervisor 监督者接口
type SupervisorConfig ¶
type SupervisorConfig struct {
Strategy SupervisorStrategy
StopChildren bool
}
SupervisorConfig 监督配置
type SupervisorStrategy ¶
type SupervisorStrategy interface {
// Decider 决定如何处理子actor故障
Decider(child ActorRef, reason interface{}) Directive
// HandleFailure 处理子actor失败
HandleFailure(child ActorRef, reason interface{})
// SetSystem 设置ActorSystem引用
SetSystem(system ActorSystem)
}
SupervisorStrategy 监督策略接口
func NewAllForOneStrategy ¶
func NewAllForOneStrategy(maxNrOfRetries int, withinTimeRange time.Duration, decider Decider) SupervisorStrategy
NewAllForOneStrategy 创建AllForOne策略
func NewOneForOneStrategy ¶
func NewOneForOneStrategy(maxNrOfRetries int, withinTimeRange time.Duration, decider Decider) SupervisorStrategy
NewOneForOneStrategy 创建OneForOne策略
type WorkerPool ¶
type WorkerPool struct {
// contains filtered or unexported fields
}
WorkerPool goroutine池,用于复用goroutine减少调度开销
func GetDefaultWorkerPool ¶
func GetDefaultWorkerPool() *WorkerPool
GetDefaultWorkerPool 获取默认的worker池(单例)
Source Files
¶
- actor.go
- actor_cell.go
- context.go
- context_impl.go
- default_ref.go
- dispatcher.go
- dispatcher_types.go
- future.go
- future_logger.go
- future_manager.go
- guardians.go
- id_generator.go
- lifecycle.go
- mailbox.go
- mailbox_impl.go
- metrics.go
- profiling.go
- props.go
- ref.go
- registry.go
- sharded_registry.go
- supervisor.go
- system.go
- worker_pool.go