Documentation
¶
Overview ¶
Package actor provides an implementation of the actor model for concurrent message processing. Actors are concurrent entities that process messages sequentially through a mailbox (inbox channel). Each actor can handle requests and optionally return responses, with built-in panic recovery and Prometheus metrics integration for monitoring.
Index ¶
- Variables
- type Actor
- type Message
- type Processor
- type Ref
- func (r *Ref[Request, Response]) Alive() bool
- func (r *Ref[Request, Response]) Name() string
- func (r *Ref[Request, Response]) Publish(message Message[Request, Response])
- func (r *Ref[Request, Response]) PublishCtx(ctx context.Context, message Message[Request, Response])
- func (r *Ref[Request, Response]) Request(request Request) (Response, error)
- func (r *Ref[Request, Response]) RequestCtx(ctx context.Context, request Request) (Response, error)
- func (r *Ref[Request, Response]) Send(request Request)
- func (r *Ref[Request, Response]) SendCtx(ctx context.Context, request Request)
- func (r *Ref[Request, Response]) Stop()
- func (r *Ref[Request, Response]) Wait()
Constants ¶
This section is empty.
Variables ¶
var ( // ErrDeadActor is returned when attempting to interact with a stopped actor. ErrDeadActor = errors.New("actor is dead") // ErrActorPanic is returned when an actor's processor panics during message processing. ErrActorPanic = errors.New("panic in actor") )
var ErrNoActors = errors.New("no actors provided")
Functions ¶
This section is empty.
Types ¶
type Actor ¶
type Actor[Request, Response any] struct { // contains filtered or unexported fields }
Actor is a concurrent entity that processes messages of type Request and produces responses of type Response. Actors are created using New and started with Run. Messages are processed sequentially through a mailbox.
func New ¶
func New[Request, Response any]( processorFactory func(ref *Ref[Request, Response]) Processor[Request, Response], ) *Actor[Request, Response]
New creates a new Actor with the given processor factory function. The factory is called when the actor is started via Run, receiving a reference to the actor which can be used to interact with other actors or itself.
func RoundRobin ¶
func RoundRobin[Request, Response any]( actors ...*Ref[Request, Response], ) (*Actor[Request, Response], error)
RoundRobin creates a new actor that uses a round-robin strategy to distribute incoming messages to a list of actors. It takes a variable number of actor references and returns a new actor that can be used to process requests. If no actors are provided, it returns an error.
func (*Actor[Request, Response]) Run ¶
func (a *Actor[Request, Response]) Run(ctx context.Context, name string, depth int) *Ref[Request, Response]
Run starts the actor and returns a reference that can be used to send messages to it. The name parameter is used for logging and metrics. The depth parameter specifies the mailbox buffer size (0 for unbuffered). The actor runs until the context is canceled or Stop is called on the returned reference.
type Message ¶
type Message[Request, Response any] struct { // Request is the data to be processed by the actor. Request Request // ResponseChan is an optional channel for receiving the response. // If nil, no response is expected (fire-and-forget). ResponseChan chan try.Try[Response] }
Message represents a message sent to an actor, containing a request and an optional response channel. If ResponseChan is nil, the message is fire-and-forget. If provided, the actor will send the response (or error) to this channel after processing.
type Processor ¶
Processor defines the interface for processing messages within an actor. Implementations must handle the message, optionally sending responses via the message's ResponseChan.
func NewProcessor ¶
func NewProcessor[Request, Response any](processorFunc func(Message[Request, Response])) Processor[Request, Response]
NewProcessor creates a Processor from a function that processes messages. The function is responsible for handling response channels if present in the message.
func SimpleProcessor ¶
func SimpleProcessor[Request, Response any](f func(req Request) (Response, error)) Processor[Request, Response]
SimpleProcessor creates a Processor from a simple request-response function. It automatically handles response channel management: sending the result or error to the response channel if present, or logging errors for fire-and-forget messages.
type Ref ¶
type Ref[Request, Response any] struct { // contains filtered or unexported fields }
Ref is a reference to a running actor. It provides methods to send messages, make requests, and control the actor's lifecycle.
func (*Ref[Request, Response]) Publish ¶
Publish sends a complete message to the actor without waiting for a response. Errors are logged but not returned. Uses context.Background().
func (*Ref[Request, Response]) PublishCtx ¶
func (r *Ref[Request, Response]) PublishCtx(ctx context.Context, message Message[Request, Response])
PublishCtx sends a complete message to the actor without waiting for a response. Errors are logged but not returned. Respects the provided context for cancellation.
func (*Ref[Request, Response]) Request ¶
Request sends a request to the actor and blocks until a response is received. Uses context.Background(). Returns ErrDeadActor if the actor is stopped.
func (*Ref[Request, Response]) RequestCtx ¶
RequestCtx sends a request to the actor and blocks until a response is received or the context is canceled. Returns ErrDeadActor if the actor is stopped, or context error if context is canceled.
func (*Ref[Request, Response]) Send ¶
func (r *Ref[Request, Response]) Send(request Request)
Send sends a request to the actor without waiting for a response. This is a fire-and-forget operation. Errors are logged but not returned. Uses context.Background().
func (*Ref[Request, Response]) SendCtx ¶
SendCtx sends a request to the actor without waiting for a response. This is a fire-and-forget operation. Errors are logged but not returned. Respects the provided context for cancellation.