actor

package
v0.19.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Nov 28, 2025 License: MIT Imports: 7 Imported by: 0

Documentation

Overview

Package actor 提供轻量级 Actor 模型实现 设计原则:

  • 最小化依赖,仅使用标准库
  • 与现有 Agent/EventBus 架构兼容
  • 支持本地 Actor,预留分布式扩展接口

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

This section is empty.

Types

type Actor

type Actor interface {
	// Receive 处理接收到的消息
	// ctx 提供 Actor 上下文,msg 为接收到的消息
	Receive(ctx *Context, msg Message)
}

Actor Actor 接口 实现此接口即可成为 Actor

type ActorFunc

type ActorFunc func(ctx *Context, msg Message)

ActorFunc 函数式 Actor,便于快速创建简单 Actor

func (ActorFunc) Receive

func (f ActorFunc) Receive(ctx *Context, msg Message)

Receive 实现 Actor 接口

type AllForOneStrategy

type AllForOneStrategy struct {
	MaxRestarts    int
	WithinDuration time.Duration
	Decider        Decider
	// contains filtered or unexported fields
}

AllForOneStrategy 全部重启策略 当一个子 Actor 失败时,重启所有子 Actor

func NewAllForOneStrategy

func NewAllForOneStrategy(maxRestarts int, within time.Duration, decider Decider) *AllForOneStrategy

NewAllForOneStrategy 创建全部重启策略

func (*AllForOneStrategy) HandleFailure

func (s *AllForOneStrategy) HandleFailure(system *System, child *PID, msg Message, err interface{}) interface{}

HandleFailure 实现 SupervisorStrategy

type ChildSpec

type ChildSpec struct {
	Name    string
	Factory func() Actor
	Props   *Props
}

ChildSpec 子 Actor 规格

type CompositeStrategy

type CompositeStrategy struct {
	// contains filtered or unexported fields
}

CompositeStrategy 组合策略 根据错误类型选择不同的策略

func NewCompositeStrategy

func NewCompositeStrategy(fallback SupervisorStrategy) *CompositeStrategy

NewCompositeStrategy 创建组合策略

func (*CompositeStrategy) HandleFailure

func (s *CompositeStrategy) HandleFailure(system *System, child *PID, msg Message, err interface{}) interface{}

HandleFailure 实现 SupervisorStrategy

func (*CompositeStrategy) RegisterStrategy

func (s *CompositeStrategy) RegisterStrategy(errType string, strategy SupervisorStrategy)

RegisterStrategy 注册特定错误类型的策略

type Context

type Context struct {
	// Self 当前 Actor 的 PID
	Self *PID
	// Sender 消息发送者的 PID(如果有)
	Sender *PID
	// Parent 父 Actor 的 PID(如果有)
	Parent *PID
	// Children 子 Actor 列表
	Children []*PID
	// contains filtered or unexported fields
}

Context Actor 执行上下文 提供 Actor 执行时所需的环境信息和操作方法

func (*Context) Context

func (c *Context) Context() context.Context

Context 获取 Go context

func (*Context) Forward

func (c *Context) Forward(target *PID)

Forward 转发当前消息到另一个 Actor

func (*Context) Message

func (c *Context) Message() Message

Message 获取当前正在处理的消息

func (*Context) Reply

func (c *Context) Reply(msg Message)

Reply 回复消息给发送者 如果是 Request/Response 模式,通过 channel 返回响应 如果有 Sender,通过消息发送响应

func (*Context) Spawn

func (c *Context) Spawn(actor Actor, name string) *PID

Spawn 创建子 Actor

func (*Context) SpawnWithProps

func (c *Context) SpawnWithProps(actor Actor, props *Props) *PID

SpawnWithProps 使用属性创建子 Actor

func (*Context) Stop

func (c *Context) Stop(pid *PID)

Stop 停止指定 Actor

func (*Context) StopSelf

func (c *Context) StopSelf()

StopSelf 停止当前 Actor

type Decider

type Decider func(err interface{}) Directive

Decider 决策函数类型

type Directive

type Directive int

Directive 监督指令

const (
	// DirectiveResume 恢复 Actor,继续处理消息
	DirectiveResume Directive = iota
	// DirectiveRestart 重启 Actor
	DirectiveRestart
	// DirectiveStop 停止 Actor
	DirectiveStop
	// DirectiveEscalate 上报给父 Actor 处理
	DirectiveEscalate
	// DirectiveRestartAfter 延迟重启 Actor
	DirectiveRestartAfter
)

func DefaultDecider

func DefaultDecider(err interface{}) Directive

DefaultDecider 默认决策器 对所有错误采取重启策略

func EscalatingDecider

func EscalatingDecider(err interface{}) Directive

EscalatingDecider 上报决策器 对所有错误采取上报策略

func ResumingDecider

func ResumingDecider(err interface{}) Directive

ResumingDecider 恢复决策器 对所有错误采取恢复策略(忽略错误继续运行)

func StoppingDecider

func StoppingDecider(err interface{}) Directive

StoppingDecider 停止决策器 对所有错误采取停止策略

func (Directive) String

func (d Directive) String() string

String 返回指令名称

type DirectiveWithDelay

type DirectiveWithDelay struct {
	Directive Directive
	Delay     time.Duration
}

DirectiveWithDelay 带延迟的指令

type DispatcherType

type DispatcherType int

DispatcherType 调度器类型

const (
	// DispatcherDefault 默认调度器(每个 Actor 一个 goroutine)
	DispatcherDefault DispatcherType = iota
	// DispatcherShared 共享调度器(多个 Actor 共享 goroutine 池)
	DispatcherShared
)

type ExponentialBackoffStrategy

type ExponentialBackoffStrategy struct {
	InitialDelay time.Duration
	MaxDelay     time.Duration
	MaxRestarts  int
	Decider      Decider
	// contains filtered or unexported fields
}

ExponentialBackoffStrategy 指数退避策略 重启间隔逐渐增加

func NewExponentialBackoffStrategy

func NewExponentialBackoffStrategy(initialDelay, maxDelay time.Duration, maxRestarts int, decider Decider) *ExponentialBackoffStrategy

NewExponentialBackoffStrategy 创建指数退避策略

func (*ExponentialBackoffStrategy) HandleFailure

func (s *ExponentialBackoffStrategy) HandleFailure(system *System, child *PID, msg Message, err interface{}) interface{}

HandleFailure 实现 SupervisorStrategy

func (*ExponentialBackoffStrategy) Reset

func (s *ExponentialBackoffStrategy) Reset()

Reset 重置退避状态

type Message

type Message interface {
	// Kind 返回消息类型标识,用于路由和监控
	Kind() string
}

Message Actor 消息接口 所有 Actor 间传递的消息都必须实现此接口

type OneForOneStrategy

type OneForOneStrategy struct {
	MaxRestarts    int           // 最大重启次数
	WithinDuration time.Duration // 时间窗口
	Decider        Decider       // 决策函数
	// contains filtered or unexported fields
}

OneForOneStrategy 一对一策略 只重启失败的 Actor,不影响其他子 Actor

func NewOneForOneStrategy

func NewOneForOneStrategy(maxRestarts int, within time.Duration, decider Decider) *OneForOneStrategy

NewOneForOneStrategy 创建一对一策略

func (*OneForOneStrategy) HandleFailure

func (s *OneForOneStrategy) HandleFailure(system *System, child *PID, msg Message, err interface{}) interface{}

HandleFailure 实现 SupervisorStrategy

type PID

type PID struct {
	// ID Actor 唯一标识(本地)
	ID string
	// Address 网络地址,本地 Actor 为空
	// 格式: "host:port" 用于未来分布式扩展
	Address string
	// contains filtered or unexported fields
}

PID (Process ID) Actor 进程标识符 类似 Erlang 的 PID,是 Actor 的唯一寻址方式

func (*PID) Request

func (p *PID) Request(msg Message, timeout time.Duration) (Message, error)

Request 发送请求并等待响应(同步调用)

func (*PID) String

func (p *PID) String() string

String 返回 PID 的字符串表示

func (*PID) Tell

func (p *PID) Tell(msg Message)

Tell 发送消息(fire-and-forget)

type PoisonPill

type PoisonPill struct{}

PoisonPill 毒丸消息,优雅停止 Actor

func (*PoisonPill) Kind

func (p *PoisonPill) Kind() string

type Props

type Props struct {
	// Name Actor 名称
	Name string
	// MailboxSize 邮箱大小
	MailboxSize int
	// Dispatcher 调度器类型
	Dispatcher DispatcherType
	// SupervisorStrategy 监督策略
	SupervisorStrategy SupervisorStrategy
}

Props Actor 属性配置

func DefaultProps

func DefaultProps(name string) *Props

DefaultProps 默认属性

type ResponseTimeout

type ResponseTimeout struct {
	Target  *PID
	Timeout time.Duration
}

ResponseTimeout 响应超时错误

func (*ResponseTimeout) Error

func (r *ResponseTimeout) Error() string

func (*ResponseTimeout) Kind

func (r *ResponseTimeout) Kind() string

type Restarting

type Restarting struct{}

Restarting Actor 正在重启消息

func (*Restarting) Kind

func (r *Restarting) Kind() string

type Started

type Started struct{}

Started Actor 启动完成消息

func (*Started) Kind

func (s *Started) Kind() string

type Stopped

type Stopped struct{}

Stopped Actor 已停止消息

func (*Stopped) Kind

func (s *Stopped) Kind() string

type Stopping

type Stopping struct{}

Stopping Actor 正在停止消息

func (*Stopping) Kind

func (s *Stopping) Kind() string

type SupervisorActor

type SupervisorActor struct {
	// contains filtered or unexported fields
}

SupervisorActor 监督者 Actor 用于构建监督树

func NewSupervisorActor

func NewSupervisorActor(config *SupervisorConfig) *SupervisorActor

NewSupervisorActor 创建监督者 Actor

func (*SupervisorActor) GetChild

func (s *SupervisorActor) GetChild(name string) *PID

GetChild 获取子 Actor

func (*SupervisorActor) Receive

func (s *SupervisorActor) Receive(ctx *Context, msg Message)

Receive 处理消息

type SupervisorConfig

type SupervisorConfig struct {
	Strategy SupervisorStrategy
	Children []ChildSpec
}

SupervisorConfig 监督配置

type SupervisorStrategy

type SupervisorStrategy interface {
	// HandleFailure 处理 Actor 失败
	// 返回应该采取的指令,可以是 Directive 或 DirectiveWithDelay
	HandleFailure(system *System, child *PID, msg Message, err interface{}) interface{}
}

SupervisorStrategy 监督策略接口

func DefaultSupervisorStrategy

func DefaultSupervisorStrategy() SupervisorStrategy

DefaultSupervisorStrategy 默认监督策略 允许 3 次重启在 1 分钟内

func LenientSupervisorStrategy

func LenientSupervisorStrategy() SupervisorStrategy

LenientSupervisorStrategy 宽松监督策略 允许更多重启

func StrictSupervisorStrategy

func StrictSupervisorStrategy() SupervisorStrategy

StrictSupervisorStrategy 严格监督策略 任何失败都停止 Actor

type System

type System struct {
	// contains filtered or unexported fields
}

System Actor 系统 管理所有 Actor 的生命周期、消息路由和监督

func NewSystem

func NewSystem(name string) *System

NewSystem 创建新的 Actor 系统

func NewSystemWithConfig

func NewSystemWithConfig(name string, config *SystemConfig) *System

NewSystemWithConfig 使用配置创建 Actor 系统

func (*System) GetActor

func (s *System) GetActor(name string) (*PID, bool)

GetActor 获取 Actor(用于调试)

func (*System) ListActors

func (s *System) ListActors() []*PID

ListActors 列出所有 Actor(用于调试)

func (*System) Name

func (s *System) Name() string

Name 返回系统名称

func (*System) Request

func (s *System) Request(target *PID, msg Message, timeout time.Duration) (Message, error)

Request 同步请求(等待响应)

func (*System) Send

func (s *System) Send(target *PID, msg Message)

Send 发送消息(无发送者)

func (*System) SendWithSender

func (s *System) SendWithSender(target *PID, msg Message, sender *PID)

SendWithSender 发送消息(带发送者)

func (*System) Shutdown

func (s *System) Shutdown()

Shutdown 关闭整个 Actor 系统

func (*System) ShutdownWithTimeout

func (s *System) ShutdownWithTimeout(timeout time.Duration)

ShutdownWithTimeout 带超时的关闭

func (*System) Spawn

func (s *System) Spawn(actor Actor, name string) *PID

Spawn 创建 Actor

func (*System) SpawnWithProps

func (s *System) SpawnWithProps(actor Actor, props *Props) *PID

SpawnWithProps 使用属性创建 Actor

func (*System) Stats

func (s *System) Stats() *SystemStats

Stats 获取统计信息

func (*System) Stop

func (s *System) Stop(pid *PID)

Stop 停止 Actor

func (*System) StopGracefully

func (s *System) StopGracefully(pid *PID, timeout time.Duration) error

StopGracefully 优雅停止 Actor(等待处理完当前消息)

type SystemConfig

type SystemConfig struct {
	// MailboxSize 全局邮箱大小
	MailboxSize int
	// DeadLetterSize 死信队列大小
	DeadLetterSize int
	// DefaultActorMailboxSize 默认 Actor 邮箱大小
	DefaultActorMailboxSize int
	// EnableDeadLetterLogging 是否记录死信
	EnableDeadLetterLogging bool
	// PanicHandler panic 处理函数
	PanicHandler func(actor *PID, msg Message, err interface{})
}

SystemConfig 系统配置

func DefaultSystemConfig

func DefaultSystemConfig() *SystemConfig

DefaultSystemConfig 默认系统配置

type SystemStats

type SystemStats struct {
	TotalActors    int64
	TotalMessages  int64
	DeadLetters    int64
	ProcessedMsgs  int64
	AverageLatency time.Duration
	StartTime      time.Time
}

SystemStats 系统统计

type Terminated

type Terminated struct {
	Who *PID
}

Terminated Actor 终止通知

func (*Terminated) Kind

func (t *Terminated) Kind() string

type Unwatch

type Unwatch struct {
	Watcher *PID
}

Unwatch 取消监控

func (*Unwatch) Kind

func (u *Unwatch) Kind() string

type Watch

type Watch struct {
	Watcher *PID
}

Watch 监控请求

func (*Watch) Kind

func (w *Watch) Kind() string

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL