actors

package
v0.3.26 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Mar 8, 2026 License: MIT Imports: 16 Imported by: 0

Documentation

Overview

Package actors provides actor model support for the workflow engine via goakt v4. It enables stateful long-lived entities, structured fault recovery, and message-driven workflows alongside existing pipeline-based workflows.

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

func NewActorAskStepFactory

func NewActorAskStepFactory() module.StepFactory

NewActorAskStepFactory returns a factory for step.actor_ask.

func NewActorSendStepFactory

func NewActorSendStepFactory() module.StepFactory

NewActorSendStepFactory returns a factory for step.actor_send.

Types

type ActorAskStep

type ActorAskStep struct {
	// contains filtered or unexported fields
}

ActorAskStep sends a message to an actor and waits for a response (Ask).

func (*ActorAskStep) Execute

Execute sends a request-response message to an actor and returns the response.

func (*ActorAskStep) Name

func (s *ActorAskStep) Name() string

Name returns the step name.

type ActorMessage

type ActorMessage struct {
	// Type identifies which handler pipeline to invoke.
	Type string `json:"type" cbor:"type"`
	// Payload is the data passed to the handler pipeline as trigger data.
	Payload map[string]any `json:"payload" cbor:"payload"`
}

ActorMessage is the standard message envelope for actor communication. All messages sent to bridge actors use this type.

type ActorPoolModule

type ActorPoolModule struct {
	// contains filtered or unexported fields
}

ActorPoolModule defines a group of actors with shared behavior, routing, and recovery.

func NewActorPoolModule

func NewActorPoolModule(name string, cfg map[string]any) (*ActorPoolModule, error)

NewActorPoolModule creates a new actor pool module from config.

func (*ActorPoolModule) GetGrainIdentity

func (m *ActorPoolModule) GetGrainIdentity(ctx context.Context, identity string) (*actor.GrainIdentity, error)

GetGrainIdentity retrieves or activates a grain for the given identity. The grain system handles lifecycle automatically: it activates on first use and passivates after idleTimeout of inactivity.

func (*ActorPoolModule) Init

Init resolves the actor.system module reference.

func (*ActorPoolModule) Mode

func (m *ActorPoolModule) Mode() string

Mode returns the lifecycle mode.

func (*ActorPoolModule) Name

func (m *ActorPoolModule) Name() string

Name returns the module name.

func (*ActorPoolModule) RequiresServices

func (m *ActorPoolModule) RequiresServices() []modular.ServiceDependency

RequiresServices declares dependencies so the modular framework orders Init correctly.

func (*ActorPoolModule) Routing

func (m *ActorPoolModule) Routing() string

Routing returns the routing strategy.

func (*ActorPoolModule) RoutingKey

func (m *ActorPoolModule) RoutingKey() string

RoutingKey returns the sticky routing key.

func (*ActorPoolModule) SelectActor

func (m *ActorPoolModule) SelectActor(msg *ActorMessage) ([]*actor.PID, error)

SelectActor picks one or more PIDs from the permanent pool based on the routing strategy. For broadcast, returns all PIDs. For other strategies, returns a single PID. The msg parameter is used for sticky routing to extract the routing key.

func (*ActorPoolModule) SetHandlers

func (m *ActorPoolModule) SetHandlers(handlers map[string]*HandlerPipeline)

SetHandlers sets the message receive handlers (called by the actor workflow handler).

func (*ActorPoolModule) SetStepRegistry

func (m *ActorPoolModule) SetStepRegistry(registry *module.StepRegistry, app modular.Application)

SetStepRegistry injects the step registry and app for actor spawn-time pipeline building.

func (*ActorPoolModule) Start

func (m *ActorPoolModule) Start(ctx context.Context) error

Start spawns actors in the pool.

func (*ActorPoolModule) Stop

func (m *ActorPoolModule) Stop(_ context.Context) error

Stop is a no-op — actors are stopped when the ActorSystem shuts down.

func (*ActorPoolModule) SystemName

func (m *ActorPoolModule) SystemName() string

SystemName returns the referenced actor.system module name.

type ActorSendStep

type ActorSendStep struct {
	// contains filtered or unexported fields
}

ActorSendStep sends a fire-and-forget message to an actor (Tell).

func (*ActorSendStep) Execute

Execute sends a fire-and-forget message to an actor pool.

func (*ActorSendStep) Name

func (s *ActorSendStep) Name() string

Name returns the step name.

type ActorSystemModule

type ActorSystemModule struct {
	// contains filtered or unexported fields
}

ActorSystemModule wraps a goakt ActorSystem as a workflow engine module.

func NewActorSystemModule

func NewActorSystemModule(name string, cfg map[string]any) (*ActorSystemModule, error)

NewActorSystemModule creates a new actor system module from config.

func (*ActorSystemModule) ActorSystem

func (m *ActorSystemModule) ActorSystem() actor.ActorSystem

ActorSystem returns the underlying goakt ActorSystem.

func (*ActorSystemModule) DefaultSupervisor

func (m *ActorSystemModule) DefaultSupervisor() *supervisor.Supervisor

DefaultSupervisor returns the default supervisor for pools that don't specify their own.

func (*ActorSystemModule) Init

Init registers the module in the service registry.

func (*ActorSystemModule) Name

func (m *ActorSystemModule) Name() string

Name returns the module name.

func (*ActorSystemModule) Start

func (m *ActorSystemModule) Start(ctx context.Context) error

Start creates and starts the goakt ActorSystem.

func (*ActorSystemModule) Stop

func (m *ActorSystemModule) Stop(ctx context.Context) error

Stop gracefully shuts down the actor system.

type ActorWorkflowHandler

type ActorWorkflowHandler struct {
	// contains filtered or unexported fields
}

ActorWorkflowHandler handles the "actors" workflow type. It parses receive handler configs and wires them to actor pool modules.

func NewActorWorkflowHandler

func NewActorWorkflowHandler() *ActorWorkflowHandler

NewActorWorkflowHandler creates a new actor workflow handler.

func (*ActorWorkflowHandler) CanHandle

func (h *ActorWorkflowHandler) CanHandle(workflowType string) bool

CanHandle returns true for "actors" workflow type.

func (*ActorWorkflowHandler) ConfigureWorkflow

func (h *ActorWorkflowHandler) ConfigureWorkflow(_ modular.Application, workflowConfig any) error

ConfigureWorkflow parses the actors workflow config.

func (*ActorWorkflowHandler) ExecuteWorkflow

func (h *ActorWorkflowHandler) ExecuteWorkflow(_ context.Context, _ string, _ string, _ map[string]any) (map[string]any, error)

ExecuteWorkflow is not used directly — actors receive messages via step.actor_send/ask.

func (*ActorWorkflowHandler) PoolHandlers

func (h *ActorWorkflowHandler) PoolHandlers() map[string]map[string]*HandlerPipeline

PoolHandlers returns the parsed handlers for wiring to actor pools.

func (*ActorWorkflowHandler) SetLogger

func (h *ActorWorkflowHandler) SetLogger(logger *slog.Logger)

SetLogger sets the logger.

type BridgeActor

type BridgeActor struct {
	// contains filtered or unexported fields
}

BridgeActor is a goakt Actor (PreStart/Receive/PostStop) used for permanent pools. It executes workflow step pipelines when it receives messages.

func NewBridgeActor

func NewBridgeActor(poolName, identity string, handlers map[string]*HandlerPipeline, registry *module.StepRegistry, app modular.Application, logger *slog.Logger) *BridgeActor

NewBridgeActor creates a BridgeActor ready to be spawned into a permanent pool.

func (*BridgeActor) PostStop

func (a *BridgeActor) PostStop(_ *goaktactor.Context) error

PostStop cleans up the actor.

func (*BridgeActor) PreStart

func (a *BridgeActor) PreStart(_ *goaktactor.Context) error

PreStart initializes the actor.

func (*BridgeActor) Receive

func (a *BridgeActor) Receive(ctx *goaktactor.ReceiveContext)

Receive handles incoming messages by dispatching to the appropriate handler pipeline.

func (*BridgeActor) State

func (a *BridgeActor) State() map[string]any

State returns a copy of the actor's current internal state (for testing/inspection).

type BridgeGrain

type BridgeGrain struct {
	// contains filtered or unexported fields
}

BridgeGrain is a goakt Grain (OnActivate/OnReceive/OnDeactivate) used for auto-managed pools. Grains are virtual actors: activated on first message, passivated after idleTimeout.

func (*BridgeGrain) OnActivate

func (g *BridgeGrain) OnActivate(_ context.Context, _ *goaktactor.GrainProps) error

OnActivate initializes grain state when the grain is loaded into memory.

func (*BridgeGrain) OnDeactivate

func (g *BridgeGrain) OnDeactivate(_ context.Context, _ *goaktactor.GrainProps) error

OnDeactivate is called when the grain is passivated (idle timeout reached).

func (*BridgeGrain) OnReceive

func (g *BridgeGrain) OnReceive(ctx *goaktactor.GrainContext)

OnReceive dispatches an ActorMessage to the matching handler pipeline.

type HandlerPipeline

type HandlerPipeline struct {
	// Description is an optional human-readable description.
	Description string
	// Steps is an ordered list of step configs (each is a map with "type" and other fields).
	Steps []map[string]any
}

HandlerPipeline defines a message handler as an ordered set of step configs.

type Plugin

type Plugin struct {
	plugin.BaseEnginePlugin
	// contains filtered or unexported fields
}

Plugin provides actor model support for the workflow engine.

func New

func New() *Plugin

New creates a new actors plugin.

func (*Plugin) Capabilities

func (p *Plugin) Capabilities() []capability.Contract

Capabilities returns the plugin's capability contracts.

func (*Plugin) ModuleFactories

func (p *Plugin) ModuleFactories() map[string]plugin.ModuleFactory

ModuleFactories returns actor module factories.

func (*Plugin) ModuleSchemas

func (p *Plugin) ModuleSchemas() []*schema.ModuleSchema

ModuleSchemas returns schemas for actor modules.

func (*Plugin) SetLogger

func (p *Plugin) SetLogger(logger *slog.Logger)

SetLogger is called by the engine to inject the logger.

func (*Plugin) SetStepRegistry

func (p *Plugin) SetStepRegistry(registry interfaces.StepRegistryProvider)

SetStepRegistry is called by the engine to inject the step registry.

func (*Plugin) StepFactories

func (p *Plugin) StepFactories() map[string]plugin.StepFactory

StepFactories returns actor step factories.

func (*Plugin) StepSchemas

func (p *Plugin) StepSchemas() []*schema.StepSchema

StepSchemas returns schemas for actor steps.

func (*Plugin) WiringHooks

func (p *Plugin) WiringHooks() []plugin.WiringHook

WiringHooks returns hooks to wire actor handlers to pool modules.

func (*Plugin) WorkflowHandlers

func (p *Plugin) WorkflowHandlers() map[string]plugin.WorkflowHandlerFactory

WorkflowHandlers returns the actor workflow handler factory.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL