Documentation
¶
Overview ¶
Package actors provides actor model support for the workflow engine via goakt v4. It enables stateful long-lived entities, structured fault recovery, and message-driven workflows alongside existing pipeline-based workflows.
Index ¶
- func NewActorAskStepFactory() module.StepFactory
- func NewActorSendStepFactory() module.StepFactory
- type ActorAskStep
- type ActorMessage
- type ActorPoolModule
- func (m *ActorPoolModule) GetGrainIdentity(ctx context.Context, identity string) (*actor.GrainIdentity, error)
- func (m *ActorPoolModule) Init(app modular.Application) error
- func (m *ActorPoolModule) Mode() string
- func (m *ActorPoolModule) Name() string
- func (m *ActorPoolModule) RequiresServices() []modular.ServiceDependency
- func (m *ActorPoolModule) Routing() string
- func (m *ActorPoolModule) RoutingKey() string
- func (m *ActorPoolModule) SelectActor(msg *ActorMessage) ([]*actor.PID, error)
- func (m *ActorPoolModule) SetHandlers(handlers map[string]*HandlerPipeline)
- func (m *ActorPoolModule) SetStepRegistry(registry *module.StepRegistry, app modular.Application)
- func (m *ActorPoolModule) Start(ctx context.Context) error
- func (m *ActorPoolModule) Stop(_ context.Context) error
- func (m *ActorPoolModule) SystemName() string
- type ActorSendStep
- type ActorSystemModule
- func (m *ActorSystemModule) ActorSystem() actor.ActorSystem
- func (m *ActorSystemModule) DefaultSupervisor() *supervisor.Supervisor
- func (m *ActorSystemModule) Init(app modular.Application) error
- func (m *ActorSystemModule) Name() string
- func (m *ActorSystemModule) Start(ctx context.Context) error
- func (m *ActorSystemModule) Stop(ctx context.Context) error
- type ActorWorkflowHandler
- func (h *ActorWorkflowHandler) CanHandle(workflowType string) bool
- func (h *ActorWorkflowHandler) ConfigureWorkflow(_ modular.Application, workflowConfig any) error
- func (h *ActorWorkflowHandler) ExecuteWorkflow(_ context.Context, _ string, _ string, _ map[string]any) (map[string]any, error)
- func (h *ActorWorkflowHandler) PoolHandlers() map[string]map[string]*HandlerPipeline
- func (h *ActorWorkflowHandler) SetLogger(logger *slog.Logger)
- type BridgeActor
- type BridgeGrain
- type HandlerPipeline
- type Plugin
- func (p *Plugin) Capabilities() []capability.Contract
- func (p *Plugin) ModuleFactories() map[string]plugin.ModuleFactory
- func (p *Plugin) ModuleSchemas() []*schema.ModuleSchema
- func (p *Plugin) SetLogger(logger *slog.Logger)
- func (p *Plugin) SetStepRegistry(registry interfaces.StepRegistryProvider)
- func (p *Plugin) StepFactories() map[string]plugin.StepFactory
- func (p *Plugin) StepSchemas() []*schema.StepSchema
- func (p *Plugin) WiringHooks() []plugin.WiringHook
- func (p *Plugin) WorkflowHandlers() map[string]plugin.WorkflowHandlerFactory
Constants ¶
This section is empty.
Variables ¶
This section is empty.
Functions ¶
func NewActorAskStepFactory ¶
func NewActorAskStepFactory() module.StepFactory
NewActorAskStepFactory returns a factory for step.actor_ask.
func NewActorSendStepFactory ¶
func NewActorSendStepFactory() module.StepFactory
NewActorSendStepFactory returns a factory for step.actor_send.
Types ¶
type ActorAskStep ¶
type ActorAskStep struct {
// contains filtered or unexported fields
}
ActorAskStep sends a message to an actor and waits for a response (Ask).
func (*ActorAskStep) Execute ¶
func (s *ActorAskStep) Execute(ctx context.Context, pc *module.PipelineContext) (*module.StepResult, error)
Execute sends a request-response message to an actor and returns the response.
type ActorMessage ¶
type ActorMessage struct {
// Type identifies which handler pipeline to invoke.
Type string `json:"type" cbor:"type"`
// Payload is the data passed to the handler pipeline as trigger data.
Payload map[string]any `json:"payload" cbor:"payload"`
}
ActorMessage is the standard message envelope for actor communication. All messages sent to bridge actors use this type.
type ActorPoolModule ¶
type ActorPoolModule struct {
// contains filtered or unexported fields
}
ActorPoolModule defines a group of actors with shared behavior, routing, and recovery.
func NewActorPoolModule ¶
func NewActorPoolModule(name string, cfg map[string]any) (*ActorPoolModule, error)
NewActorPoolModule creates a new actor pool module from config.
func (*ActorPoolModule) GetGrainIdentity ¶
func (m *ActorPoolModule) GetGrainIdentity(ctx context.Context, identity string) (*actor.GrainIdentity, error)
GetGrainIdentity retrieves or activates a grain for the given identity. The grain system handles lifecycle automatically: it activates on first use and passivates after idleTimeout of inactivity.
func (*ActorPoolModule) Init ¶
func (m *ActorPoolModule) Init(app modular.Application) error
Init resolves the actor.system module reference.
func (*ActorPoolModule) Mode ¶
func (m *ActorPoolModule) Mode() string
Mode returns the lifecycle mode.
func (*ActorPoolModule) Name ¶
func (m *ActorPoolModule) Name() string
Name returns the module name.
func (*ActorPoolModule) RequiresServices ¶
func (m *ActorPoolModule) RequiresServices() []modular.ServiceDependency
RequiresServices declares dependencies so the modular framework orders Init correctly.
func (*ActorPoolModule) Routing ¶
func (m *ActorPoolModule) Routing() string
Routing returns the routing strategy.
func (*ActorPoolModule) RoutingKey ¶
func (m *ActorPoolModule) RoutingKey() string
RoutingKey returns the sticky routing key.
func (*ActorPoolModule) SelectActor ¶
func (m *ActorPoolModule) SelectActor(msg *ActorMessage) ([]*actor.PID, error)
SelectActor picks one or more PIDs from the permanent pool based on the routing strategy. For broadcast, returns all PIDs. For other strategies, returns a single PID. The msg parameter is used for sticky routing to extract the routing key.
func (*ActorPoolModule) SetHandlers ¶
func (m *ActorPoolModule) SetHandlers(handlers map[string]*HandlerPipeline)
SetHandlers sets the message receive handlers (called by the actor workflow handler).
func (*ActorPoolModule) SetStepRegistry ¶
func (m *ActorPoolModule) SetStepRegistry(registry *module.StepRegistry, app modular.Application)
SetStepRegistry injects the step registry and app for actor spawn-time pipeline building.
func (*ActorPoolModule) Start ¶
func (m *ActorPoolModule) Start(ctx context.Context) error
Start spawns actors in the pool.
func (*ActorPoolModule) Stop ¶
func (m *ActorPoolModule) Stop(_ context.Context) error
Stop is a no-op — actors are stopped when the ActorSystem shuts down.
func (*ActorPoolModule) SystemName ¶
func (m *ActorPoolModule) SystemName() string
SystemName returns the referenced actor.system module name.
type ActorSendStep ¶
type ActorSendStep struct {
// contains filtered or unexported fields
}
ActorSendStep sends a fire-and-forget message to an actor (Tell).
func (*ActorSendStep) Execute ¶
func (s *ActorSendStep) Execute(ctx context.Context, pc *module.PipelineContext) (*module.StepResult, error)
Execute sends a fire-and-forget message to an actor pool.
type ActorSystemModule ¶
type ActorSystemModule struct {
// contains filtered or unexported fields
}
ActorSystemModule wraps a goakt ActorSystem as a workflow engine module.
func NewActorSystemModule ¶
func NewActorSystemModule(name string, cfg map[string]any) (*ActorSystemModule, error)
NewActorSystemModule creates a new actor system module from config.
func (*ActorSystemModule) ActorSystem ¶
func (m *ActorSystemModule) ActorSystem() actor.ActorSystem
ActorSystem returns the underlying goakt ActorSystem.
func (*ActorSystemModule) DefaultSupervisor ¶
func (m *ActorSystemModule) DefaultSupervisor() *supervisor.Supervisor
DefaultSupervisor returns the default supervisor for pools that don't specify their own.
func (*ActorSystemModule) Init ¶
func (m *ActorSystemModule) Init(app modular.Application) error
Init registers the module in the service registry.
func (*ActorSystemModule) Name ¶
func (m *ActorSystemModule) Name() string
Name returns the module name.
type ActorWorkflowHandler ¶
type ActorWorkflowHandler struct {
// contains filtered or unexported fields
}
ActorWorkflowHandler handles the "actors" workflow type. It parses receive handler configs and wires them to actor pool modules.
func NewActorWorkflowHandler ¶
func NewActorWorkflowHandler() *ActorWorkflowHandler
NewActorWorkflowHandler creates a new actor workflow handler.
func (*ActorWorkflowHandler) CanHandle ¶
func (h *ActorWorkflowHandler) CanHandle(workflowType string) bool
CanHandle returns true for "actors" workflow type.
func (*ActorWorkflowHandler) ConfigureWorkflow ¶
func (h *ActorWorkflowHandler) ConfigureWorkflow(_ modular.Application, workflowConfig any) error
ConfigureWorkflow parses the actors workflow config.
func (*ActorWorkflowHandler) ExecuteWorkflow ¶
func (h *ActorWorkflowHandler) ExecuteWorkflow(_ context.Context, _ string, _ string, _ map[string]any) (map[string]any, error)
ExecuteWorkflow is not used directly — actors receive messages via step.actor_send/ask.
func (*ActorWorkflowHandler) PoolHandlers ¶
func (h *ActorWorkflowHandler) PoolHandlers() map[string]map[string]*HandlerPipeline
PoolHandlers returns the parsed handlers for wiring to actor pools.
func (*ActorWorkflowHandler) SetLogger ¶
func (h *ActorWorkflowHandler) SetLogger(logger *slog.Logger)
SetLogger sets the logger.
type BridgeActor ¶
type BridgeActor struct {
// contains filtered or unexported fields
}
BridgeActor is a goakt Actor (PreStart/Receive/PostStop) used for permanent pools. It executes workflow step pipelines when it receives messages.
func NewBridgeActor ¶
func NewBridgeActor(poolName, identity string, handlers map[string]*HandlerPipeline, registry *module.StepRegistry, app modular.Application, logger *slog.Logger) *BridgeActor
NewBridgeActor creates a BridgeActor ready to be spawned into a permanent pool.
func (*BridgeActor) PostStop ¶
func (a *BridgeActor) PostStop(_ *goaktactor.Context) error
PostStop cleans up the actor.
func (*BridgeActor) PreStart ¶
func (a *BridgeActor) PreStart(_ *goaktactor.Context) error
PreStart initializes the actor.
func (*BridgeActor) Receive ¶
func (a *BridgeActor) Receive(ctx *goaktactor.ReceiveContext)
Receive handles incoming messages by dispatching to the appropriate handler pipeline.
func (*BridgeActor) State ¶
func (a *BridgeActor) State() map[string]any
State returns a copy of the actor's current internal state (for testing/inspection).
type BridgeGrain ¶
type BridgeGrain struct {
// contains filtered or unexported fields
}
BridgeGrain is a goakt Grain (OnActivate/OnReceive/OnDeactivate) used for auto-managed pools. Grains are virtual actors: activated on first message, passivated after idleTimeout.
func (*BridgeGrain) OnActivate ¶
func (g *BridgeGrain) OnActivate(_ context.Context, _ *goaktactor.GrainProps) error
OnActivate initializes grain state when the grain is loaded into memory.
func (*BridgeGrain) OnDeactivate ¶
func (g *BridgeGrain) OnDeactivate(_ context.Context, _ *goaktactor.GrainProps) error
OnDeactivate is called when the grain is passivated (idle timeout reached).
func (*BridgeGrain) OnReceive ¶
func (g *BridgeGrain) OnReceive(ctx *goaktactor.GrainContext)
OnReceive dispatches an ActorMessage to the matching handler pipeline.
type HandlerPipeline ¶
type HandlerPipeline struct {
// Description is an optional human-readable description.
Description string
// Steps is an ordered list of step configs (each is a map with "type" and other fields).
Steps []map[string]any
}
HandlerPipeline defines a message handler as an ordered set of step configs.
type Plugin ¶
type Plugin struct {
plugin.BaseEnginePlugin
// contains filtered or unexported fields
}
Plugin provides actor model support for the workflow engine.
func (*Plugin) Capabilities ¶
func (p *Plugin) Capabilities() []capability.Contract
Capabilities returns the plugin's capability contracts.
func (*Plugin) ModuleFactories ¶
func (p *Plugin) ModuleFactories() map[string]plugin.ModuleFactory
ModuleFactories returns actor module factories.
func (*Plugin) ModuleSchemas ¶
func (p *Plugin) ModuleSchemas() []*schema.ModuleSchema
ModuleSchemas returns schemas for actor modules.
func (*Plugin) SetStepRegistry ¶
func (p *Plugin) SetStepRegistry(registry interfaces.StepRegistryProvider)
SetStepRegistry is called by the engine to inject the step registry.
func (*Plugin) StepFactories ¶
func (p *Plugin) StepFactories() map[string]plugin.StepFactory
StepFactories returns actor step factories.
func (*Plugin) StepSchemas ¶
func (p *Plugin) StepSchemas() []*schema.StepSchema
StepSchemas returns schemas for actor steps.
func (*Plugin) WiringHooks ¶
func (p *Plugin) WiringHooks() []plugin.WiringHook
WiringHooks returns hooks to wire actor handlers to pool modules.
func (*Plugin) WorkflowHandlers ¶
func (p *Plugin) WorkflowHandlers() map[string]plugin.WorkflowHandlerFactory
WorkflowHandlers returns the actor workflow handler factory.