actor

package
v0.0.0-...-5103540 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jan 28, 2026 License: MIT Imports: 13 Imported by: 0

Documentation

Overview

Package actor provides an implementation of the actor model for concurrent message processing. Actors are concurrent entities that process messages sequentially through a mailbox (inbox channel). Each actor can handle requests and optionally return responses, with built-in panic recovery and Prometheus metrics integration for monitoring.

Index

Constants

This section is empty.

Variables

View Source
var (
	// ErrDeadActor is returned when attempting to interact with a stopped actor.
	ErrDeadActor = errors.New("actor is dead")
	// ErrActorPanic is returned when an actor's processor panics during message processing.
	ErrActorPanic = errors.New("panic in actor")
)
View Source
var ErrNoActors = errors.New("no actors provided")

Functions

This section is empty.

Types

type Actor

type Actor[Request, Response any] struct {
	// contains filtered or unexported fields
}

Actor is a concurrent entity that processes messages of type Request and produces responses of type Response. Actors are created using New and started with Run. Messages are processed sequentially through a mailbox.

func New

func New[Request, Response any](
	processorFactory func(ref *Ref[Request, Response]) Processor[Request, Response],
) *Actor[Request, Response]

New creates a new Actor with the given processor factory function. The factory is called when the actor is started via Run, receiving a reference to the actor which can be used to interact with other actors or itself.

func RoundRobin

func RoundRobin[Request, Response any](
	actors ...*Ref[Request, Response],
) (*Actor[Request, Response], error)

RoundRobin creates a new actor that uses a round-robin strategy to distribute incoming messages to a list of actors. It takes a variable number of actor references and returns a new actor that can be used to process requests. If no actors are provided, it returns an error.

func (*Actor[Request, Response]) Run

func (a *Actor[Request, Response]) Run(ctx context.Context, name string, depth int) *Ref[Request, Response]

Run starts the actor and returns a reference that can be used to send messages to it. The name parameter is used for logging and metrics. The depth parameter specifies the mailbox buffer size (0 for unbuffered). The actor runs until the context is canceled or Stop is called on the returned reference.

type Message

type Message[Request, Response any] struct {
	// Request is the data to be processed by the actor.
	Request Request
	// ResponseChan is an optional channel for receiving the response.
	// If nil, no response is expected (fire-and-forget).
	ResponseChan chan try.Try[Response]
}

Message represents a message sent to an actor, containing a request and an optional response channel. If ResponseChan is nil, the message is fire-and-forget. If provided, the actor will send the response (or error) to this channel after processing.

type Processor

type Processor[Request, Response any] interface {
	Process(msg Message[Request, Response])
}

Processor defines the interface for processing messages within an actor. Implementations must handle the message, optionally sending responses via the message's ResponseChan.

func NewProcessor

func NewProcessor[Request, Response any](processorFunc func(Message[Request, Response])) Processor[Request, Response]

NewProcessor creates a Processor from a function that processes messages. The function is responsible for handling response channels if present in the message.

func SimpleProcessor

func SimpleProcessor[Request, Response any](f func(req Request) (Response, error)) Processor[Request, Response]

SimpleProcessor creates a Processor from a simple request-response function. It automatically handles response channel management: sending the result or error to the response channel if present, or logging errors for fire-and-forget messages.

type Ref

type Ref[Request, Response any] struct {
	// contains filtered or unexported fields
}

Ref is a reference to a running actor. It provides methods to send messages, make requests, and control the actor's lifecycle.

func (*Ref[Request, Response]) Alive

func (r *Ref[Request, Response]) Alive() bool

Alive returns true if the actor is still running.

func (*Ref[Request, Response]) Name

func (r *Ref[Request, Response]) Name() string

Name returns the actor's name.

func (*Ref[Request, Response]) Publish

func (r *Ref[Request, Response]) Publish(message Message[Request, Response])

Publish sends a complete message to the actor without waiting for a response. Errors are logged but not returned. Uses context.Background().

func (*Ref[Request, Response]) PublishCtx

func (r *Ref[Request, Response]) PublishCtx(ctx context.Context, message Message[Request, Response])

PublishCtx sends a complete message to the actor without waiting for a response. Errors are logged but not returned. Respects the provided context for cancellation.

func (*Ref[Request, Response]) Request

func (r *Ref[Request, Response]) Request(request Request) (Response, error)

Request sends a request to the actor and blocks until a response is received. Uses context.Background(). Returns ErrDeadActor if the actor is stopped.

func (*Ref[Request, Response]) RequestCtx

func (r *Ref[Request, Response]) RequestCtx(ctx context.Context, request Request) (Response, error)

RequestCtx sends a request to the actor and blocks until a response is received or the context is canceled. Returns ErrDeadActor if the actor is stopped, or context error if context is canceled.

func (*Ref[Request, Response]) Send

func (r *Ref[Request, Response]) Send(request Request)

Send sends a request to the actor without waiting for a response. This is a fire-and-forget operation. Errors are logged but not returned. Uses context.Background().

func (*Ref[Request, Response]) SendCtx

func (r *Ref[Request, Response]) SendCtx(ctx context.Context, request Request)

SendCtx sends a request to the actor without waiting for a response. This is a fire-and-forget operation. Errors are logged but not returned. Respects the provided context for cancellation.

func (*Ref[Request, Response]) Stop

func (r *Ref[Request, Response]) Stop()

Stop signals the actor to shut down by closing its inbox channel. It is safe to call multiple times.

func (*Ref[Request, Response]) Wait

func (r *Ref[Request, Response]) Wait()

Wait blocks until the actor has fully stopped processing messages.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL