Documentation
¶
Overview ¶
Package actor declares the types used to represent actors in the Actor Model.
The actors model provide a high level abstraction for writing concurrent and distributed systems. This approach simplifies the burden imposed on engineers, such as explicit locks and concurrent access to shared state, as actors receive messages synchronously.
The following quote from Wikipedia distills the definition of an actor down to its essence
In response to a message that it receives, an actor can: make local decisions, create more actors, send more messages, and determine how to respond to the next message received.
Creating Actors ¶
Props provide the building blocks for declaring how actors should be created. The following example defines an actor using a function literal to process messages:
var props Props = actor.PropsFromFunc(func(c Context) {
// process messages
})
Alternatively, a type which conforms to the Actor interface, by defining a single Receive method, can be used.
type MyActor struct {}
func (a *MyActor) Receive(c Context) {
// process messages
}
var props Props = actor.PropsFromProducer(func() Actor { return &MyActor{} })
Spawn and SpawnNamed use the given props to create a running instances of an actor. Once spawned, the actor is ready to process incoming messages. To spawn an actor with a unique name, use
pid := context.Spawn(props)
The result of calling Spawn is a unique PID or process identifier.
Each time an actor is spawned, a new mailbox is created and associated with the PID. Messages are sent to the mailbox and then forwarded to the actor to process.
Processing Messages ¶
An actor processes messages via its Receive handler. The signature of this function is:
Receive(c actor.Context)
The actor system guarantees that this method is called synchronously, therefore there is no requirement to protect shared state inside calls to this function.
Communicating With Actors ¶
A PID is the primary interface for sending messages to actors. Context.Send is used to send an asynchronous message to the actor associated with the PID:
context.Send(pid, "Hello World")
Depending on the requirements, communication between actors can take place synchronously or asynchronously. Regardless of the circumstances, actors always communicate via a PID.
When sending a message using PID.Request or PID.RequestFuture, the actor which receives the message will respond using the Context.Sender method, which returns the PID of the sender.
For synchronous communication, an actor will use a Future and wait for the result before continuing. To send a message to an actor and wait for a response, use the RequestFuture method, which returns a Future:
f := actor.RequestFuture(pid,"Hello", 50 * time.Millisecond) res, err := f.Result() // waits for pid to reply
Example ¶
Demonstrates how to create an actor using a function literal and how to send a message asynchronously
context := system.Root
props := actor.PropsFromFunc(func(c actor.Context) {
if msg, ok := c.Message().(string); ok {
fmt.Println(msg) // outputs "Hello World"
}
})
pid := context.Spawn(props)
context.Send(pid, "Hello World")
time.Sleep(time.Millisecond * 100)
_ = context.StopFuture(pid).Wait() // wait for the actor to stop
Output: Hello World
Example (Synchronous) ¶
Demonstrates how to send a message from one actor to another and for the caller to wait for a response before proceeding
var wg sync.WaitGroup
wg.Add(1)
// callee will wait for the PING message
callee := system.Root.Spawn(actor.PropsFromFunc(func(c actor.Context) {
if msg, ok := c.Message().(string); ok {
fmt.Println(msg) // outputs PING
c.Respond("PONG")
}
}))
// caller will send a PING message and wait for the PONG
caller := system.Root.Spawn(actor.PropsFromFunc(func(c actor.Context) {
switch msg := c.Message().(type) {
// the first message an actor receives after it has started
case *actor.Started:
// send a PING to the callee, and specify the response
// is sent to Self, which is this actor'pids PID
c.Request(callee, "PING")
case string:
fmt.Println(msg) // PONG
wg.Done()
}
}))
wg.Wait()
_ = system.Root.StopFuture(callee).Wait()
_ = system.Root.StopFuture(caller).Wait()
Output: PING PONG
Index ¶
- Constants
- Variables
- func MessageName(msg any) string
- func MessageType(msg any) string
- func NewDeadLetter(actorSystem *ActorSystem) *deadLetterProcess
- func NewGuardians(actorSystem *ActorSystem) *guardiansValue
- func NewPriorityGoringQueue() *priorityQueue
- func NewPriorityMpscQueue() *priorityQueue
- func NewPriorityQueue(queueProducer func() queue) *priorityQueue
- func SubscribeSupervision(actorSystem *ActorSystem) *eventstream.Subscription
- func SystemLabels(system *ActorSystem) []attribute.KeyValue
- func UnwrapEnvelope(message any) (ReadonlyMessageHeader, any, *PID)
- func UnwrapEnvelopeMessage(message any) any
- type Actor
- type ActorProcess
- type ActorSystem
- func (as *ActorSystem) Address() string
- func (as *ActorSystem) GetHostPort() (host string, port int, err error)
- func (as *ActorSystem) IsStopped() bool
- func (as *ActorSystem) Logger() *slog.Logger
- func (as *ActorSystem) NewLocalPID(id string) *PID
- func (as *ActorSystem) OnStop(f func())
- func (as *ActorSystem) Shutdown()
- type AddressResolver
- type AutoReceiveMessage
- type AutoRespond
- type Behavior
- type CapturedContext
- type Config
- type ConfigOption
- func WithDeadLetterRequestLogging(enabled bool) ConfigOption
- func WithDeadLetterThrottleCount(count int32) ConfigOption
- func WithDeadLetterThrottleInterval(duration time.Duration) ConfigOption
- func WithDefaultPrometheusProvider(port ...int) ConfigOption
- func WithDeveloperSupervisionLogging(enabled bool) ConfigOption
- func WithDiagnosticsSerializer(serializer func(Actor) string) ConfigOption
- func WithLoggerFactory(factory func(system *ActorSystem) *slog.Logger) ConfigOption
- func WithMetricProviders(provider metric.MeterProvider) ConfigOption
- func WithRequestTimeout(d time.Duration) ConfigOption
- func WithStopTimeout(d time.Duration) ConfigOption
- func WithSystemID(id string) ConfigOption
- type Context
- type ContextDecorator
- type ContextDecoratorFunc
- type DeadLetterEvent
- type DeadLetterResponse
- func (*DeadLetterResponse) Descriptor() ([]byte, []int)deprecated
- func (x *DeadLetterResponse) GetTarget() *PID
- func (*DeadLetterResponse) ProtoMessage()
- func (x *DeadLetterResponse) ProtoReflect() protoreflect.Message
- func (x *DeadLetterResponse) Reset()
- func (x *DeadLetterResponse) String() string
- type DeciderFunc
- type Deduplicator
- type Directive
- type Dispatcher
- type EventStreamProcess
- type ExtensionContext
- type Failure
- type Future
- type IgnoreDeadLetterLogging
- type InfrastructureMessage
- type Mailbox
- type MailboxMessage
- type MailboxMiddleware
- type MailboxProducer
- func Bounded(size int, mailboxStats ...MailboxMiddleware) MailboxProducer
- func BoundedDropping(size int, mailboxStats ...MailboxMiddleware) MailboxProducer
- func Unbounded(mailboxStats ...MailboxMiddleware) MailboxProducer
- func UnboundedLockfree(mailboxStats ...MailboxMiddleware) MailboxProducer
- func UnboundedPriority(mailboxStats ...MailboxMiddleware) MailboxProducer
- func UnboundedPriorityMpsc(mailboxStats ...MailboxMiddleware) MailboxProducer
- type MessageBatch
- type MessageEnvelope
- type MessageInvoker
- type Metrics
- type NotInfluenceReceiveTimeout
- type PID
- func (*PID) Descriptor() ([]byte, []int)deprecated
- func (pid *PID) Equal(other *PID) bool
- func (x *PID) GetAddress() string
- func (x *PID) GetId() string
- func (x *PID) GetRequestId() uint32
- func (*PID) ProtoMessage()
- func (x *PID) ProtoReflect() protoreflect.Message
- func (x *PID) Reset()
- func (x *PID) String() string
- type PIDSet
- func (p *PIDSet) Add(v *PID)
- func (p *PIDSet) Clear()
- func (p *PIDSet) Clone() *PIDSet
- func (p *PIDSet) Contains(v *PID) bool
- func (p *PIDSet) Empty() bool
- func (p *PIDSet) ForEach(f func(i int, pid *PID))
- func (p *PIDSet) Get(index int) *PID
- func (p *PIDSet) Len() int
- func (p *PIDSet) Remove(v *PID) bool
- func (p *PIDSet) Values() []*PID
- type PoisonPill
- type PriorityMessage
- type Process
- type ProcessRegistryValue
- func (pr *ProcessRegistryValue) Add(process Process, id string) (*PID, bool)
- func (pr *ProcessRegistryValue) Get(pid *PID) (Process, bool)
- func (pr *ProcessRegistryValue) GetLocal(id string) (Process, bool)
- func (pr *ProcessRegistryValue) NextID() string
- func (pr *ProcessRegistryValue) RegisterAddressResolver(handler AddressResolver)
- func (pr *ProcessRegistryValue) Remove(pid *PID)
- type Producer
- type ProducerWithActorSystem
- type Props
- type PropsOption
- func WithContextDecorator(contextDecorator ...ContextDecorator) PropsOption
- func WithDispatcher(dispatcher Dispatcher) PropsOption
- func WithFunc(f ReceiveFunc) PropsOption
- func WithGuardian(guardian SupervisorStrategy) PropsOption
- func WithMailbox(mailbox MailboxProducer) PropsOption
- func WithOnInit(init ...func(ctx Context)) PropsOption
- func WithProducer(p Producer) PropsOption
- func WithReceiverMiddleware(middleware ...ReceiverMiddleware) PropsOption
- func WithSenderMiddleware(middleware ...SenderMiddleware) PropsOption
- func WithSpawnFunc(spawn SpawnFunc) PropsOption
- func WithSpawnMiddleware(middleware ...SpawnMiddleware) PropsOption
- func WithSupervisor(supervisor SupervisorStrategy) PropsOption
- type ReadonlyMessageHeader
- type ReceiveFunc
- type ReceiveTimeout
- type ReceiverContext
- type ReceiverFunc
- type ReceiverMiddleware
- type Restart
- type RestartStatistics
- type Restarting
- type ResumeMailbox
- type RootContext
- func (rc *RootContext) Actor() Actor
- func (rc *RootContext) ActorSystem() *ActorSystem
- func (rc RootContext) Copy() *RootContext
- func (rc *RootContext) Logger() *slog.Logger
- func (rc *RootContext) Message() any
- func (rc *RootContext) MessageHeader() ReadonlyMessageHeader
- func (rc *RootContext) Parent() *PID
- func (rc *RootContext) Poison(pid *PID)
- func (rc *RootContext) PoisonFuture(pid *PID) Future
- func (rc *RootContext) Request(pid *PID, message any)
- func (rc *RootContext) RequestFuture(pid *PID, message any, timeout time.Duration) Future
- func (rc *RootContext) RequestWithCustomSender(pid *PID, message any, sender *PID)
- func (rc *RootContext) Self() *PID
- func (rc *RootContext) Send(pid *PID, message any)
- func (rc *RootContext) Sender() *PID
- func (rc *RootContext) Spawn(props *Props) *PID
- func (rc *RootContext) SpawnNamed(props *Props, name string) (*PID, error)
- func (rc *RootContext) SpawnPrefix(props *Props, prefix string) *PID
- func (rc *RootContext) Stop(pid *PID)
- func (rc *RootContext) StopFuture(pid *PID) Future
- func (rc *RootContext) TrySpawn(props *Props) (*PID, error)
- func (rc *RootContext) TrySpawnPrefix(props *Props, prefix string) (*PID, error)
- func (rc *RootContext) WithGuardian(guardian SupervisorStrategy) *RootContext
- func (rc *RootContext) WithHeaders(headers map[string]string) *RootContext
- func (rc *RootContext) WithSenderMiddleware(middleware ...SenderMiddleware) *RootContext
- func (rc *RootContext) WithSpawnMiddleware(middleware ...SpawnMiddleware) *RootContext
- type SenderContext
- type SenderFunc
- type SenderMiddleware
- type ShouldThrottle
- type SliceMap
- type SpawnFunc
- type SpawnMiddleware
- type SpawnerContext
- type Started
- type Stop
- type Stopped
- type Stopping
- type Supervisor
- type SupervisorEvent
- type SupervisorStrategy
- func DefaultSupervisorStrategy() SupervisorStrategy
- func NewAllForOneStrategy(maxNrOfRetries int, withinDuration time.Duration, decider DeciderFunc) SupervisorStrategy
- func NewExponentialBackoffStrategy(backoffWindow time.Duration, initialBackoff time.Duration) SupervisorStrategy
- func NewOneForOneStrategy(maxNrOfRetries int, withinDuration time.Duration, decider DeciderFunc) SupervisorStrategy
- func NewRestartingStrategy() SupervisorStrategy
- func RestartingSupervisorStrategy() SupervisorStrategy
- type SuspendMailbox
- type SystemMessage
- type Terminated
- func (*Terminated) Descriptor() ([]byte, []int)deprecated
- func (x *Terminated) GetWho() *PID
- func (x *Terminated) GetWhy() TerminatedReason
- func (*Terminated) ProtoMessage()
- func (x *Terminated) ProtoReflect() protoreflect.Message
- func (x *Terminated) Reset()
- func (x *Terminated) String() string
- func (*Terminated) SystemMessage()
- type TerminatedReason
- func (TerminatedReason) Descriptor() protoreflect.EnumDescriptor
- func (x TerminatedReason) Enum() *TerminatedReason
- func (TerminatedReason) EnumDescriptor() ([]byte, []int)deprecated
- func (x TerminatedReason) Number() protoreflect.EnumNumber
- func (x TerminatedReason) String() string
- func (TerminatedReason) Type() protoreflect.EnumType
- type Touch
- type Touched
- type Unwatch
- type Valve
- type Watch
Examples ¶
Constants ¶
const (
DefaultPriority = int8(priorityLevels / 2)
)
Variables ¶
var ( TerminatedReason_name = map[int32]string{ 0: "Stopped", 1: "AddressTerminated", 2: "NotFound", } TerminatedReason_value = map[string]int32{ "Stopped": 0, "AddressTerminated": 1, "NotFound": 2, } )
Enum value maps for TerminatedReason.
var EmptyMessageHeader = make(messageHeader)
EmptyMessageHeader represents an empty message header.
var ErrDeadLetter = errors.New("future: dead letter")
ErrDeadLetter is meaning you request to a unreachable PID.
var ErrNameExists = errors.New("spawn: name exists")
ErrNameExists is the error used when an existing name is used for spawning an actor.
var ErrTimeout = errors.New("future: timeout")
ErrTimeout is the error used when a future times out before receiving a result.
var File_actor_proto protoreflect.FileDescriptor
Functions ¶
func MessageName ¶
MessageName returns the message type name without a leading pointer prefix. This is useful for metrics where stable type names are preferred. If msg is nil, "<nil>" is returned.
func MessageType ¶
MessageType returns the full type name of the given message, including any pointer prefix. It is typically used in logging where the exact Go type is desired. If msg is nil, "<nil>" is returned.
func NewDeadLetter ¶
func NewDeadLetter(actorSystem *ActorSystem) *deadLetterProcess
func NewGuardians ¶
func NewGuardians(actorSystem *ActorSystem) *guardiansValue
func NewPriorityGoringQueue ¶
func NewPriorityGoringQueue() *priorityQueue
func NewPriorityMpscQueue ¶
func NewPriorityMpscQueue() *priorityQueue
func NewPriorityQueue ¶
func NewPriorityQueue(queueProducer func() queue) *priorityQueue
func SubscribeSupervision ¶
func SubscribeSupervision(actorSystem *ActorSystem) *eventstream.Subscription
SubscribeSupervision subscribes to supervision events on the given actor system's event stream. It returns the subscription so callers can unsubscribe later.
func SystemLabels ¶
func SystemLabels(system *ActorSystem) []attribute.KeyValue
SystemLabels returns a standard set of attributes that identify the actor system emitting the metric. These labels are used across modules to maintain consistency in OpenTelemetry reporting.
func UnwrapEnvelope ¶
func UnwrapEnvelope(message any) (ReadonlyMessageHeader, any, *PID)
UnwrapEnvelope extracts header, message and sender from an envelope.
func UnwrapEnvelopeMessage ¶
UnwrapEnvelopeMessage returns the message from an envelope.
Types ¶
type Actor ¶
type Actor interface {
Receive(c Context)
}
Actor is the interface that defines the Receive method.
Receive is sent messages to be processed from the mailbox associated with the instance of the actor
type ActorProcess ¶
type ActorProcess struct {
// contains filtered or unexported fields
}
ActorProcess is the Process implementation for regular actors.
func NewActorProcess ¶
func NewActorProcess(mailbox Mailbox) *ActorProcess
NewActorProcess creates a new ActorProcess with the given mailbox.
func (*ActorProcess) SendSystemMessage ¶
func (ref *ActorProcess) SendSystemMessage(_ *PID, message any)
SendSystemMessage posts a system message to the actor's mailbox.
func (*ActorProcess) SendUserMessage ¶
func (ref *ActorProcess) SendUserMessage(_ *PID, message any)
SendUserMessage posts a user message to the actor's mailbox.
func (*ActorProcess) Stop ¶
func (ref *ActorProcess) Stop(pid *PID)
Stop terminates the actor process.
type ActorSystem ¶
type ActorSystem struct {
ProcessRegistry *ProcessRegistryValue
Root *RootContext
EventStream *eventstream.EventStream
Guardians *guardiansValue
DeadLetter *deadLetterProcess
Extensions *extensions.Extensions
Config *Config
ID string
// contains filtered or unexported fields
}
ActorSystem is the runtime environment that hosts actors and manages their execution, supervision, and system-wide services.
func NewActorSystem ¶
func NewActorSystem(options ...ConfigOption) *ActorSystem
NewActorSystem creates a new actor system with optional configuration options. It panics if the configuration is invalid. Use NewActorSystemWithError for a non-panicking variant.
func NewActorSystemWithConfig ¶
func NewActorSystemWithConfig(config *Config) *ActorSystem
NewActorSystemWithConfig creates a new actor system using an explicit configuration struct.
func NewActorSystemWithError ¶
func NewActorSystemWithError(options ...ConfigOption) (*ActorSystem, error)
NewActorSystemWithError creates a new actor system with optional configuration options. It returns an error if the configuration is invalid.
func (*ActorSystem) Address ¶
func (as *ActorSystem) Address() string
Address returns the network address of the actor system.
func (*ActorSystem) GetHostPort ¶
func (as *ActorSystem) GetHostPort() (host string, port int, err error)
func (*ActorSystem) IsStopped ¶
func (as *ActorSystem) IsStopped() bool
func (*ActorSystem) Logger ¶
func (as *ActorSystem) Logger() *slog.Logger
Logger returns the logger associated with the actor system.
func (*ActorSystem) NewLocalPID ¶
func (as *ActorSystem) NewLocalPID(id string) *PID
NewLocalPID creates a PID for a local actor with the given id.
func (*ActorSystem) OnStop ¶
func (as *ActorSystem) OnStop(f func())
func (*ActorSystem) Shutdown ¶
func (as *ActorSystem) Shutdown()
type AddressResolver ¶
An AddressResolver is used to resolve remote actors
type AutoReceiveMessage ¶
type AutoReceiveMessage interface {
AutoReceiveMessage()
}
An AutoReceiveMessage is a special kind of user message that will be handled in some way automatically by the actor
type AutoRespond ¶
type Behavior ¶
type Behavior []ReceiveFunc
func NewBehavior ¶
func NewBehavior() Behavior
func (*Behavior) Become ¶
func (b *Behavior) Become(receive ReceiveFunc)
func (*Behavior) BecomeStacked ¶
func (b *Behavior) BecomeStacked(receive ReceiveFunc)
func (*Behavior) UnbecomeStacked ¶
func (b *Behavior) UnbecomeStacked()
type CapturedContext ¶
type CapturedContext struct {
MessageEnvelope *MessageEnvelope
Context Context
}
CapturedContext holds a message envelope along with the context it was captured from.
func (*CapturedContext) Apply ¶
func (cc *CapturedContext) Apply()
Apply restores the stored message to the actor context so it can be re-processed by the actor.
func (*CapturedContext) Receive ¶
func (cc *CapturedContext) Receive()
Receive reprocesses the captured message on the captured context. It captures the current context state before processing and restores it afterwards.
type Config ¶
type Config struct {
DeadLetterThrottleInterval time.Duration // throttle deadletter logging after this interval
DeadLetterThrottleCount int32 // throttle deadletter logging after this count
DeadLetterRequestLogging bool // do not log dead-letters with sender
DeveloperSupervisionLogging bool // console log and promote supervision logs to Warning level
DiagnosticsSerializer func(Actor) string // extract diagnostics from actor and return as string
MetricsProvider metric.MeterProvider
// MetricsEnabled toggles emission of Proto.Actor metrics.
MetricsEnabled bool
LoggerFactory func(system *ActorSystem) *slog.Logger
// StopTimeout is the timeout used for StopFuture and PoisonFuture calls.
StopTimeout time.Duration
// RequestTimeout is the default timeout used for request operations.
RequestTimeout time.Duration
// SystemID overrides the auto-generated short UUID for the actor system.
// When set, this value is used as ActorSystem.ID instead of generating
// a random shortuuid. This enables human-readable cluster member names.
SystemID string
}
Config holds configuration options for an ActorSystem.
func Configure ¶
func Configure(options ...ConfigOption) *Config
Configure sets the configuration options. It panics if the resulting configuration is invalid. Use ConfigureWithError for a non-panicking variant.
func ConfigureWithError ¶
func ConfigureWithError(options ...ConfigOption) (*Config, error)
ConfigureWithError sets the configuration options and validates the result. It returns an error if the configuration is invalid.
type ConfigOption ¶
type ConfigOption func(config *Config)
ConfigOption is a function that configures the actor system
func WithDeadLetterRequestLogging ¶
func WithDeadLetterRequestLogging(enabled bool) ConfigOption
WithDeadLetterRequestLogging sets the dead letter request logging on or off
func WithDeadLetterThrottleCount ¶
func WithDeadLetterThrottleCount(count int32) ConfigOption
WithDeadLetterThrottleCount sets the dead letter throttle count
func WithDeadLetterThrottleInterval ¶
func WithDeadLetterThrottleInterval(duration time.Duration) ConfigOption
WithDeadLetterThrottleInterval sets the dead letter throttle interval
func WithDefaultPrometheusProvider ¶
func WithDefaultPrometheusProvider(port ...int) ConfigOption
WithDefaultPrometheusProvider sets the default prometheus provider
func WithDeveloperSupervisionLogging ¶
func WithDeveloperSupervisionLogging(enabled bool) ConfigOption
WithDeveloperSupervisionLogging sets the developer supervision logging on or off
func WithDiagnosticsSerializer ¶
func WithDiagnosticsSerializer(serializer func(Actor) string) ConfigOption
WithDiagnosticsSerializer sets the diagnostics serializer
func WithLoggerFactory ¶
func WithLoggerFactory(factory func(system *ActorSystem) *slog.Logger) ConfigOption
WithLoggerFactory sets the logger factory to use for the actor system
func WithMetricProviders ¶
func WithMetricProviders(provider metric.MeterProvider) ConfigOption
WithMetricProviders sets the metric providers
func WithRequestTimeout ¶
func WithRequestTimeout(d time.Duration) ConfigOption
WithRequestTimeout sets the default timeout used for request operations.
func WithStopTimeout ¶
func WithStopTimeout(d time.Duration) ConfigOption
WithStopTimeout sets the timeout used for StopFuture and PoisonFuture calls.
func WithSystemID ¶
func WithSystemID(id string) ConfigOption
WithSystemID sets a custom system ID for the actor system, overriding the default random short UUID. This enables human-readable cluster member names (e.g., "agg-node-1" instead of "C2UAuL74WkCNhAsGcEWqJx").
type Context ¶
type Context interface {
// contains filtered or unexported methods
}
Context contains contextual information for actors
Example (SetReceiveTimeout) ¶
SetReceiveTimeout allows an actor to be notified repeatedly if it does not receive any messages for a specified duration
package main
import (
"fmt"
"sync"
"time"
"github.com/awevoke/protoactor-go/actor"
)
type setReceiveTimeoutActor struct {
*sync.WaitGroup
}
// Receive is the default message handler when an actor is started
func (f *setReceiveTimeoutActor) Receive(context actor.Context) {
switch context.Message().(type) {
case *actor.Started:
// when the actor starts, set the receive timeout to 10 milliseconds.
//
// the system will send an *actor.ReceiveTimeout message every time the timeout
// expires until SetReceiveTimeout is called again with a duration < 1 ms]
context.SetReceiveTimeout(10 * time.Millisecond)
case *actor.ReceiveTimeout:
fmt.Println("timed out")
f.Done()
}
}
// SetReceiveTimeout allows an actor to be notified repeatedly if it does not receive any messages
// for a specified duration
func main() {
wg := &sync.WaitGroup{}
wg.Add(1)
pid := system.Root.Spawn(actor.PropsFromProducer(func() actor.Actor { return &setReceiveTimeoutActor{wg} }))
defer func() {
_ = system.Root.StopFuture(pid).Wait()
}()
wg.Wait() // wait for the ReceiveTimeout message
}
Output: timed out
type ContextDecorator ¶
type ContextDecorator func(next ContextDecoratorFunc) ContextDecoratorFunc
func DeduplicationContext ¶
func DeduplicationContext(fn Deduplicator, ttl time.Duration) ContextDecorator
DeduplicationContext returns a context decorator that ignores duplicate messages as determined by the provided Deduplicator function. Keys expire after the given TTL, allowing messages to be processed again once the entry is purged.
type ContextDecoratorFunc ¶
ContextDecoratorFunc modifies a context before it is used by an actor.
type DeadLetterEvent ¶
type DeadLetterEvent struct {
PID *PID // The invalid process, to which the message was sent
Message any // The message that could not be delivered
Sender *PID // the process that sent the Message
}
A DeadLetterEvent is published via event.Publish when a message is sent to a nonexistent PID
type DeadLetterResponse ¶
type DeadLetterResponse struct {
Target *PID `protobuf:"bytes,1,opt,name=Target,proto3" json:"Target,omitempty"`
// contains filtered or unexported fields
}
func (*DeadLetterResponse) Descriptor
deprecated
func (*DeadLetterResponse) Descriptor() ([]byte, []int)
Deprecated: Use DeadLetterResponse.ProtoReflect.Descriptor instead.
func (*DeadLetterResponse) GetTarget ¶
func (x *DeadLetterResponse) GetTarget() *PID
func (*DeadLetterResponse) ProtoMessage ¶
func (*DeadLetterResponse) ProtoMessage()
func (*DeadLetterResponse) ProtoReflect ¶
func (x *DeadLetterResponse) ProtoReflect() protoreflect.Message
func (*DeadLetterResponse) Reset ¶
func (x *DeadLetterResponse) Reset()
func (*DeadLetterResponse) String ¶
func (x *DeadLetterResponse) String() string
type DeciderFunc ¶
DeciderFunc is a function which is called by a SupervisorStrategy
type Deduplicator ¶
Deduplicator extracts a deduplication key from a message. Messages yielding the same key within the TTL are treated as duplicates.
type Directive ¶
type Directive int
Directive is an enum for supervision actions
const ( // ResumeDirective instructs the supervisor to resume the actor and continue processing messages ResumeDirective Directive = iota // RestartDirective instructs the supervisor to discard the actor, replacing it with a new instance, // before processing additional messages RestartDirective // StopDirective instructs the supervisor to stop the actor StopDirective // EscalateDirective instructs the supervisor to escalate handling of the failure to the actor'pids parent supervisor EscalateDirective )
Directive determines how a supervisor should handle a faulting actor
func DefaultDecider ¶
DefaultDecider is a decider that will always restart the failing child actor
type Dispatcher ¶
type Dispatcher interface {
// Schedule enqueues the provided function for execution.
Schedule(fn func())
// Throughput returns the number of messages processed per scheduling pass.
Throughput() int
}
Dispatcher schedules work for actors and reports processing throughput.
func NewDefaultDispatcher ¶
func NewDefaultDispatcher(throughput int) Dispatcher
NewDefaultDispatcher creates a Dispatcher that schedules work on separate goroutines.
func NewSynchronizedDispatcher ¶
func NewSynchronizedDispatcher(throughput int) Dispatcher
NewSynchronizedDispatcher creates a Dispatcher that executes work sequentially.
type EventStreamProcess ¶
type EventStreamProcess struct {
// contains filtered or unexported fields
}
func NewEventStreamProcess ¶
func NewEventStreamProcess(actorSystem *ActorSystem) *EventStreamProcess
func (*EventStreamProcess) SendSystemMessage ¶
func (e *EventStreamProcess) SendSystemMessage(_ *PID, _ any)
func (*EventStreamProcess) SendUserMessage ¶
func (e *EventStreamProcess) SendUserMessage(_ *PID, message any)
func (*EventStreamProcess) Stop ¶
func (e *EventStreamProcess) Stop(_ *PID)
type ExtensionContext ¶
type ExtensionContext interface {
// contains filtered or unexported methods
}
ExtensionContext exposes extension-related functionality for actors.
type Failure ¶
type Failure struct {
Who *PID
Reason any
RestartStats *RestartStatistics
Message any
}
Failure message is sent to an actor parent when an exception is thrown by one of its methods
func (*Failure) SystemMessage ¶
func (*Failure) SystemMessage()
SystemMessage marks Failure as a system message.
type Future ¶
type Future interface {
// PID to the backing actor for the Future result.
PID() *PID
// PipeTo forwards the result or error of the future to the specified PIDs.
PipeTo(pids ...*PID)
// Result waits for the future to resolve and returns the result or error.
Result() (any, error)
// Wait blocks until the future resolves and returns the error, if any.
Wait() error
}
Future defines the public interface for future responses.
type IgnoreDeadLetterLogging ¶
type IgnoreDeadLetterLogging interface {
IgnoreDeadLetterLogging()
}
IgnoreDeadLetterLogging messages are not logged in deadletter log
type InfrastructureMessage ¶
type InfrastructureMessage interface {
InfrastructureMessage()
}
InfrastructureMessage is a marker for all built in Proto.Actor messages
type Mailbox ¶
type Mailbox interface {
PostUserMessage(message any)
PostSystemMessage(message any)
RegisterHandlers(invoker MessageInvoker, dispatcher Dispatcher)
Start()
UserMessageCount() int
}
Mailbox interface is used to enqueue messages to the mailbox
type MailboxMessage ¶
type MailboxMessage interface {
MailboxMessage()
}
MailboxMessage marks a message that can be processed by a mailbox.
type MailboxMiddleware ¶
type MailboxMiddleware interface {
MailboxStarted()
MessagePosted(message any)
MessageReceived(message any)
MailboxEmpty()
}
MailboxMiddleware is an interface for intercepting messages and events in the mailbox
type MailboxProducer ¶
type MailboxProducer func() Mailbox
MailboxProducer is a function which creates a new mailbox
func Bounded ¶
func Bounded(size int, mailboxStats ...MailboxMiddleware) MailboxProducer
Bounded returns a producer which creates a bounded mailbox of the specified size.
func BoundedDropping ¶
func BoundedDropping(size int, mailboxStats ...MailboxMiddleware) MailboxProducer
BoundedDropping returns a producer which creates a bounded mailbox of the specified size that drops front element on push.
func Unbounded ¶
func Unbounded(mailboxStats ...MailboxMiddleware) MailboxProducer
Unbounded returns a producer which creates an unbounded mailbox
func UnboundedLockfree ¶
func UnboundedLockfree(mailboxStats ...MailboxMiddleware) MailboxProducer
UnboundedLockfree returns a producer which creates an unbounded, lock-free mailbox. This mailbox is cheaper to allocate, but has a slower throughput than the plain Unbounded mailbox.
func UnboundedPriority ¶
func UnboundedPriority(mailboxStats ...MailboxMiddleware) MailboxProducer
func UnboundedPriorityMpsc ¶
func UnboundedPriorityMpsc(mailboxStats ...MailboxMiddleware) MailboxProducer
type MessageBatch ¶
type MessageBatch interface {
GetMessages() []any
}
MessageBatch represents a collection of messages delivered together.
type MessageEnvelope ¶
MessageEnvelope wraps a message along with optional headers and sender.
func EnvelopeWithHeaders ¶
func EnvelopeWithHeaders(message any, headers map[string]string) *MessageEnvelope
EnvelopeWithHeaders returns a *MessageEnvelope carrying message with the given headers attached. If message is already a *MessageEnvelope, the returned envelope is a copy: existing envelope header values win on key conflict (explicit wrap is more specific than caller-provided headers). The caller's envelope is never mutated. If headers is empty, an existing envelope is returned as-is and a raw message is wrapped without a header map.
func WrapEnvelope ¶
func WrapEnvelope(message any) *MessageEnvelope
WrapEnvelope ensures the message is inside a MessageEnvelope.
func (*MessageEnvelope) GetHeader ¶
func (envelope *MessageEnvelope) GetHeader(key string) string
GetHeader returns the value of a header key.
func (*MessageEnvelope) SetHeader ¶
func (envelope *MessageEnvelope) SetHeader(key string, value string)
SetHeader sets a header key to the given value.
type MessageInvoker ¶
type MessageInvoker interface {
InvokeSystemMessage(any)
InvokeUserMessage(any)
EscalateFailure(reason any, message any)
}
MessageInvoker is the interface used by a mailbox to forward messages for processing
type Metrics ¶
type Metrics struct {
// contains filtered or unexported fields
}
Metrics provides access to system-wide metric instrumentation.
func NewMetrics ¶
func NewMetrics(system *ActorSystem, provider metric.MeterProvider) *Metrics
NewMetrics initializes metrics collection for the given actor system using the supplied OpenTelemetry MeterProvider. It also sets the global MeterProvider via otel.SetMeterProvider, which changes process-wide state so other packages will use the same provider.
func (*Metrics) CommonLabels ¶
CommonLabels returns the default set of labels for an actor metric, including system-wide labels and the specific actor type.
func (*Metrics) ExtensionID ¶
func (m *Metrics) ExtensionID() extensions.ExtensionID
ExtensionID returns the unique ID for the metrics extension.
type NotInfluenceReceiveTimeout ¶
type NotInfluenceReceiveTimeout interface {
NotInfluenceReceiveTimeout()
}
NotInfluenceReceiveTimeout messages will not reset the ReceiveTimeout timer of an actor that receives the message
type PID ¶
type PID struct {
Address string `protobuf:"bytes,1,opt,name=Address,proto3" json:"Address,omitempty"`
Id string `protobuf:"bytes,2,opt,name=Id,proto3" json:"Id,omitempty"`
RequestId uint32 `protobuf:"varint,3,opt,name=request_id,json=requestId,proto3" json:"request_id,omitempty"`
// contains filtered or unexported fields
}
func UnwrapEnvelopeSender ¶
UnwrapEnvelopeSender returns the sender from an envelope.
func (*PID) Descriptor
deprecated
func (*PID) GetAddress ¶
func (*PID) GetRequestId ¶
func (*PID) ProtoMessage ¶
func (*PID) ProtoMessage()
func (*PID) ProtoReflect ¶
func (x *PID) ProtoReflect() protoreflect.Message
type PIDSet ¶
type PIDSet struct {
// contains filtered or unexported fields
}
PIDSet is a set of PIDs backed by a slice and lookup map.
type PoisonPill ¶
type PoisonPill struct {
// contains filtered or unexported fields
}
user messages
func (*PoisonPill) AutoReceiveMessage ¶
func (*PoisonPill) AutoReceiveMessage()
AutoReceiveMessage marks PoisonPill as an automatically handled message.
func (*PoisonPill) Descriptor
deprecated
func (*PoisonPill) Descriptor() ([]byte, []int)
Deprecated: Use PoisonPill.ProtoReflect.Descriptor instead.
func (*PoisonPill) ProtoMessage ¶
func (*PoisonPill) ProtoMessage()
func (*PoisonPill) ProtoReflect ¶
func (x *PoisonPill) ProtoReflect() protoreflect.Message
func (*PoisonPill) Reset ¶
func (x *PoisonPill) Reset()
func (*PoisonPill) String ¶
func (x *PoisonPill) String() string
type PriorityMessage ¶
type PriorityMessage interface {
GetPriority() int8
}
type Process ¶
type Process interface {
SendUserMessage(pid *PID, message any)
SendSystemMessage(pid *PID, message any)
Stop(pid *PID)
}
A Process is an interface that defines the base contract for interaction of actors
type ProcessRegistryValue ¶
type ProcessRegistryValue struct {
SequenceID uint64
ActorSystem *ActorSystem
Address string
LocalPIDs *SliceMap
RemoteHandlers []AddressResolver
}
ProcessRegistryValue tracks processes within an actor system.
func NewProcessRegistry ¶
func NewProcessRegistry(actorSystem *ActorSystem) *ProcessRegistryValue
NewProcessRegistry creates a new process registry for the actor system.
func (*ProcessRegistryValue) Add ¶
func (pr *ProcessRegistryValue) Add(process Process, id string) (*PID, bool)
Add registers a process with the given id and returns its PID.
func (*ProcessRegistryValue) Get ¶
func (pr *ProcessRegistryValue) Get(pid *PID) (Process, bool)
Get retrieves a process by PID, checking remote handlers if needed.
func (*ProcessRegistryValue) GetLocal ¶
func (pr *ProcessRegistryValue) GetLocal(id string) (Process, bool)
GetLocal retrieves a local process by id.
func (*ProcessRegistryValue) NextID ¶
func (pr *ProcessRegistryValue) NextID() string
NextID returns the next unique process identifier.
func (*ProcessRegistryValue) RegisterAddressResolver ¶
func (pr *ProcessRegistryValue) RegisterAddressResolver(handler AddressResolver)
RegisterAddressResolver adds a resolver for remote addresses.
func (*ProcessRegistryValue) Remove ¶
func (pr *ProcessRegistryValue) Remove(pid *PID)
Remove deletes the process with the given PID from the registry.
type ProducerWithActorSystem ¶
type ProducerWithActorSystem func(system *ActorSystem) Actor
ProducerWithActorSystem creates a new actor instance with access to the actor system.
type Props ¶
type Props struct {
// contains filtered or unexported fields
}
Props represents configuration to define how an actor should be created.
func PropsFromFunc ¶
func PropsFromFunc(f ReceiveFunc, opts ...PropsOption) *Props
PropsFromFunc creates a props with the given receive func assigned as the actor producer.
func PropsFromProducer ¶
func PropsFromProducer(producer Producer, opts ...PropsOption) *Props
PropsFromProducer creates a props with the given actor producer assigned.
func PropsFromProducerWithActorSystem ¶
func PropsFromProducerWithActorSystem(producer ProducerWithActorSystem, opts ...PropsOption) *Props
PropsFromProducerWithActorSystem creates a props with the given actor producer that receives the ActorSystem.
func (*Props) Clone ¶
func (props *Props) Clone(opts ...PropsOption) *Props
func (*Props) Configure ¶
func (props *Props) Configure(opts ...PropsOption) *Props
type PropsOption ¶
type PropsOption func(props *Props)
func WithContextDecorator ¶
func WithContextDecorator(contextDecorator ...ContextDecorator) PropsOption
WithContextDecorator wraps the actor context with custom decorators for cross-cutting concerns.
func WithDispatcher ¶
func WithDispatcher(dispatcher Dispatcher) PropsOption
WithDispatcher sets the dispatcher used for scheduling actor message processing.
func WithFunc ¶
func WithFunc(f ReceiveFunc) PropsOption
WithFunc creates Props from a simple receive function, shorthand for WithProducer wrapping a function.
func WithGuardian ¶
func WithGuardian(guardian SupervisorStrategy) PropsOption
WithGuardian sets the guardian strategy for actors spawned at the root level.
func WithMailbox ¶
func WithMailbox(mailbox MailboxProducer) PropsOption
WithMailbox sets the mailbox producer for the actor, controlling message buffering and delivery order.
func WithOnInit ¶
func WithOnInit(init ...func(ctx Context)) PropsOption
WithOnInit sets callback functions invoked when the actor is initialized, before processing messages.
func WithProducer ¶
func WithProducer(p Producer) PropsOption
WithProducer assigns a producer function that creates new actor instances, called on initial spawn and restarts.
func WithReceiverMiddleware ¶
func WithReceiverMiddleware(middleware ...ReceiverMiddleware) PropsOption
WithReceiverMiddleware adds middleware that intercepts incoming messages before the actor receives them.
func WithSenderMiddleware ¶
func WithSenderMiddleware(middleware ...SenderMiddleware) PropsOption
WithSenderMiddleware adds middleware that intercepts outgoing messages before they are sent.
func WithSpawnFunc ¶
func WithSpawnFunc(spawn SpawnFunc) PropsOption
WithSpawnFunc sets a custom function used to spawn the actor process.
func WithSpawnMiddleware ¶
func WithSpawnMiddleware(middleware ...SpawnMiddleware) PropsOption
WithSpawnMiddleware adds middleware that intercepts the actor spawn process.
func WithSupervisor ¶
func WithSupervisor(supervisor SupervisorStrategy) PropsOption
WithSupervisor sets the supervision strategy that determines how child actor failures are handled.
type ReadonlyMessageHeader ¶
type ReadonlyMessageHeader interface {
Get(key string) string
Keys() []string
Length() int
ToMap() map[string]string
}
ReadonlyMessageHeader exposes read-only accessors for a message header.
func UnwrapEnvelopeHeader ¶
func UnwrapEnvelopeHeader(message any) ReadonlyMessageHeader
UnwrapEnvelopeHeader returns the header from an envelope.
type ReceiveFunc ¶
type ReceiveFunc func(c Context)
The ReceiveFunc type is an adapter to allow the use of ordinary functions as actors to process messages
type ReceiveTimeout ¶
type ReceiveTimeout struct{}
A ReceiveTimeout message is sent to an actor after the Context.ReceiveTimeout duration has expired
type ReceiverContext ¶
type ReceiverContext interface {
// contains filtered or unexported methods
}
ReceiverContext provides context for receiving messages.
type ReceiverFunc ¶
type ReceiverFunc func(c ReceiverContext, envelope *MessageEnvelope)
ReceiverFunc processes an incoming message envelope.
type ReceiverMiddleware ¶
type ReceiverMiddleware func(next ReceiverFunc) ReceiverFunc
type Restart ¶
type Restart struct{}
Restart is message sent by the actor system to control the lifecycle of an actor
func (*Restart) SystemMessage ¶
func (*Restart) SystemMessage()
SystemMessage marks Restart as a system message.
type RestartStatistics ¶
type RestartStatistics struct {
// contains filtered or unexported fields
}
RestartStatistics keeps track of how many times an actor have restarted and when
func NewRestartStatistics ¶
func NewRestartStatistics() *RestartStatistics
NewRestartStatistics construct a RestartStatistics
func (*RestartStatistics) Fail ¶
func (rs *RestartStatistics) Fail()
Fail increases the associated actors' failure count
func (*RestartStatistics) FailureCount ¶
func (rs *RestartStatistics) FailureCount() int
FailureCount returns failure count
func (*RestartStatistics) NumberOfFailures ¶
func (rs *RestartStatistics) NumberOfFailures(withinDuration time.Duration) int
NumberOfFailures returns number of failures within a given duration
func (*RestartStatistics) Reset ¶
func (rs *RestartStatistics) Reset()
Reset the associated actors' failure count
type Restarting ¶
type Restarting struct{}
A Restarting message is sent to an actor when the actor is being restarted by the system due to a failure
func (*Restarting) AutoReceiveMessage ¶
func (*Restarting) AutoReceiveMessage()
AutoReceiveMessage marks Restarting as an automatically handled message.
type ResumeMailbox ¶
type ResumeMailbox struct{}
ResumeMailbox is message sent by the actor system to resume mailbox processing.
This will not be forwarded to the Receive method
func (*ResumeMailbox) MailboxMessage ¶
func (*ResumeMailbox) MailboxMessage()
MailboxMessage implements the MailboxMessage interface.
type RootContext ¶
type RootContext struct {
// contains filtered or unexported fields
}
RootContext is the entry point used to interact with the actor system.
func NewRootContext ¶
func NewRootContext(actorSystem *ActorSystem, header map[string]string, middleware ...SenderMiddleware) *RootContext
func (*RootContext) Actor ¶
func (rc *RootContext) Actor() Actor
Actor always returns nil for RootContext since it does not represent a real actor.
func (*RootContext) ActorSystem ¶
func (rc *RootContext) ActorSystem() *ActorSystem
func (RootContext) Copy ¶
func (rc RootContext) Copy() *RootContext
func (*RootContext) Logger ¶
func (rc *RootContext) Logger() *slog.Logger
func (*RootContext) Message ¶
func (rc *RootContext) Message() any
Message always returns nil for RootContext since it is not processing a message.
func (*RootContext) MessageHeader ¶
func (rc *RootContext) MessageHeader() ReadonlyMessageHeader
MessageHeader returns the headers attached to the current context.
func (*RootContext) Parent ¶
func (rc *RootContext) Parent() *PID
Parent returns the PID of the parent actor. RootContext has no parent and returns nil.
func (*RootContext) Poison ¶
func (rc *RootContext) Poison(pid *PID)
Poison will tell actor to stop after processing current user messages in mailbox.
func (*RootContext) PoisonFuture ¶
func (rc *RootContext) PoisonFuture(pid *PID) Future
PoisonFuture will tell actor to stop after processing current user messages in mailbox, and return its future.
func (*RootContext) Request ¶
func (rc *RootContext) Request(pid *PID, message any)
Request sends a message to the given PID expecting a response.
func (*RootContext) RequestFuture ¶
RequestFuture sends a message to a given PID and returns a Future.
func (*RootContext) RequestWithCustomSender ¶
func (rc *RootContext) RequestWithCustomSender(pid *PID, message any, sender *PID)
RequestWithCustomSender sends a message on behalf of the provided sender PID.
func (*RootContext) Self ¶
func (rc *RootContext) Self() *PID
Self returns the PID representing the root context.
func (*RootContext) Send ¶
func (rc *RootContext) Send(pid *PID, message any)
Send delivers a message to the given PID using the configured middleware chain.
func (*RootContext) Sender ¶
func (rc *RootContext) Sender() *PID
Sender returns the PID of the message sender if available.
func (*RootContext) Spawn ¶
func (rc *RootContext) Spawn(props *Props) *PID
Spawn starts a new actor based on props and named with a unique id.
Example ¶
Spawn creates instances of actors, similar to 'new' or 'make' but for actors.
var wg sync.WaitGroup
wg.Add(1)
// create root context
// define the actor props.
// props define the creation process of an actor
props := actor.PropsFromFunc(func(ctx actor.Context) {
// check if the message is a *actor.Started message
// this is the first message all actors get
// actor.Started is received async and can be used
// to initialize your actors initial state
if _, ok := ctx.Message().(*actor.Started); ok {
fmt.Println("hello world")
wg.Done()
}
})
// spawn the actor based on the props
system.Root.Spawn(props)
wg.Wait()
Output: hello world
func (*RootContext) SpawnNamed ¶
func (rc *RootContext) SpawnNamed(props *Props, name string) (*PID, error)
SpawnNamed starts a new actor based on props and named using the specified name
ErrNameExists will be returned if id already exists ¶
Please do not use name sharing same pattern with system actors, for example "YourPrefix$1", "Remote$1", "future$1".
Example ¶
Spawn creates instances of actors, similar to 'new' or 'make' but for actors.
var wg sync.WaitGroup
wg.Add(1)
// create root context
context := system.Root
// define the actor props.
// props define the creation process of an actor
props := actor.PropsFromFunc(func(ctx actor.Context) {
// check if the message is a *actor.Started message
// this is the first message all actors get
// actor.Started is received async and can be used
// to initialize your actors initial state
if _, ok := ctx.Message().(*actor.Started); ok {
fmt.Println("hello world")
wg.Done()
}
})
// spawn the actor based on the props
_, err := context.SpawnNamed(props, "my-actor")
if err != nil {
log.Fatal("The actor name is already in use")
}
wg.Wait()
Output: hello world
func (*RootContext) SpawnPrefix ¶
func (rc *RootContext) SpawnPrefix(props *Props, prefix string) *PID
SpawnPrefix starts a new actor based on props and named using a prefix followed by a unique id.
func (*RootContext) Stop ¶
func (rc *RootContext) Stop(pid *PID)
Stop will stop actor immediately regardless of existing user messages in mailbox.
func (*RootContext) StopFuture ¶
func (rc *RootContext) StopFuture(pid *PID) Future
StopFuture will stop actor immediately regardless of existing user messages in mailbox, and return its future.
func (*RootContext) TrySpawn ¶
func (rc *RootContext) TrySpawn(props *Props) (*PID, error)
TrySpawn starts a new actor based on props and named with a unique id. Unlike Spawn, it returns an error instead of panicking on failure.
func (*RootContext) TrySpawnPrefix ¶
func (rc *RootContext) TrySpawnPrefix(props *Props, prefix string) (*PID, error)
TrySpawnPrefix starts a new actor based on props and named using a prefix followed by a unique id. Unlike SpawnPrefix, it returns an error instead of panicking on failure.
func (*RootContext) WithGuardian ¶
func (rc *RootContext) WithGuardian(guardian SupervisorStrategy) *RootContext
WithGuardian sets a guardian strategy used when spawning actors.
func (*RootContext) WithHeaders ¶
func (rc *RootContext) WithHeaders(headers map[string]string) *RootContext
WithHeaders sets the message headers used for subsequent sends and requests.
func (*RootContext) WithSenderMiddleware ¶
func (rc *RootContext) WithSenderMiddleware(middleware ...SenderMiddleware) *RootContext
WithSenderMiddleware applies sender middleware to outgoing messages.
func (*RootContext) WithSpawnMiddleware ¶
func (rc *RootContext) WithSpawnMiddleware(middleware ...SpawnMiddleware) *RootContext
WithSpawnMiddleware applies spawn middleware when creating child actors.
type SenderContext ¶
type SenderContext interface {
// contains filtered or unexported methods
}
SenderContext provides context for sending messages.
type SenderFunc ¶
type SenderFunc func(c SenderContext, target *PID, envelope *MessageEnvelope)
SenderFunc processes an outgoing message envelope before delivery.
type SenderMiddleware ¶
type SenderMiddleware func(next SenderFunc) SenderFunc
type ShouldThrottle ¶
type ShouldThrottle func() Valve
func NewThrottle ¶
func NewThrottle(maxEventsInPeriod int32, period time.Duration, throttledCallBack func(int32)) ShouldThrottle
NewThrottle This has no guarantees that the throttle opens exactly after the period, since it is reset asynchronously Throughput has been prioritized over exact re-opening throttledCallBack, This will be called with the number of events what was throttled after the period
type SliceMap ¶
type SliceMap struct {
LocalPIDs []cmap.ConcurrentMap
}
SliceMap is a sharded map for storing local PIDs.
type SpawnFunc ¶
type SpawnFunc func(actorSystem *ActorSystem, id string, props *Props, parentContext SpawnerContext) (*PID, error)
var DefaultSpawner SpawnFunc = defaultSpawner
DefaultSpawner this is a hacking way to allow Proto.Router access default spawner func.
type SpawnMiddleware ¶
type SpawnerContext ¶
type SpawnerContext interface {
// contains filtered or unexported methods
}
SpawnerContext provides context for spawning child actors.
type Started ¶
type Started struct{}
A Started message is sent to an actor once it has been started and ready to begin receiving messages.
func (*Started) SystemMessage ¶
func (*Started) SystemMessage()
SystemMessage marks Started as a system message.
type Stop ¶
type Stop struct {
// contains filtered or unexported fields
}
func (*Stop) Descriptor
deprecated
func (*Stop) ProtoMessage ¶
func (*Stop) ProtoMessage()
func (*Stop) ProtoReflect ¶
func (x *Stop) ProtoReflect() protoreflect.Message
func (*Stop) SystemMessage ¶
func (*Stop) SystemMessage()
SystemMessage marks Stop as a system message.
type Stopped ¶
type Stopped struct{}
A Stopped message is sent to the actor once it has been stopped. A stopped actor will receive no further messages
func (*Stopped) AutoReceiveMessage ¶
func (*Stopped) AutoReceiveMessage()
AutoReceiveMessage marks Stopped as an automatically handled message.
type Stopping ¶
type Stopping struct{}
A Stopping message is sent to an actor prior to the actor being stopped
func (*Stopping) AutoReceiveMessage ¶
func (*Stopping) AutoReceiveMessage()
AutoReceiveMessage marks Stopping as an automatically handled message.
type Supervisor ¶
type Supervisor interface {
Children() []*PID
EscalateFailure(reason any, message any)
RestartChildren(pids ...*PID)
StopChildren(pids ...*PID)
ResumeChildren(pids ...*PID)
}
Supervisor is an interface that is used by the SupervisorStrategy to manage child actor lifecycle
type SupervisorEvent ¶
SupervisorEvent is sent on the EventStream when a supervisor have applied a directive to a failing child actor
type SupervisorStrategy ¶
type SupervisorStrategy interface {
HandleFailure(actorSystem *ActorSystem, supervisor Supervisor, child *PID, rs *RestartStatistics, reason any, message any)
}
SupervisorStrategy is an interface that decides how to handle failing child actors
func DefaultSupervisorStrategy ¶
func DefaultSupervisorStrategy() SupervisorStrategy
func NewAllForOneStrategy ¶
func NewAllForOneStrategy(maxNrOfRetries int, withinDuration time.Duration, decider DeciderFunc) SupervisorStrategy
NewAllForOneStrategy returns a new SupervisorStrategy which applies the given fault Directive from the decider to the failing child and all its children.
This strategy is appropriate when the children have a strong dependency, such that and any single one failing would place them all into a potentially invalid state.
func NewExponentialBackoffStrategy ¶
func NewExponentialBackoffStrategy(backoffWindow time.Duration, initialBackoff time.Duration) SupervisorStrategy
NewExponentialBackoffStrategy creates a new Supervisor strategy that restarts a faulting child using an exponential back off algorithm:
delay =
func NewOneForOneStrategy ¶
func NewOneForOneStrategy(maxNrOfRetries int, withinDuration time.Duration, decider DeciderFunc) SupervisorStrategy
NewOneForOneStrategy returns a new Supervisor strategy which applies the fault Directive from the decider to the failing child process.
This strategy is applicable if it is safe to handle a single child in isolation from its peers or dependents
func NewRestartingStrategy ¶
func NewRestartingStrategy() SupervisorStrategy
func RestartingSupervisorStrategy ¶
func RestartingSupervisorStrategy() SupervisorStrategy
type SuspendMailbox ¶
type SuspendMailbox struct{}
SuspendMailbox is message sent by the actor system to suspend mailbox processing.
This will not be forwarded to the Receive method
func (*SuspendMailbox) MailboxMessage ¶
func (*SuspendMailbox) MailboxMessage()
MailboxMessage implements the MailboxMessage interface.
type SystemMessage ¶
type SystemMessage interface {
SystemMessage()
}
A SystemMessage message is reserved for specific lifecycle messages used by the actor system
type Terminated ¶
type Terminated struct {
Who *PID `protobuf:"bytes,1,opt,name=who,proto3" json:"who,omitempty"`
Why TerminatedReason `protobuf:"varint,2,opt,name=Why,proto3,enum=actor.TerminatedReason" json:"Why,omitempty"`
// contains filtered or unexported fields
}
func (*Terminated) Descriptor
deprecated
func (*Terminated) Descriptor() ([]byte, []int)
Deprecated: Use Terminated.ProtoReflect.Descriptor instead.
func (*Terminated) GetWho ¶
func (x *Terminated) GetWho() *PID
func (*Terminated) GetWhy ¶
func (x *Terminated) GetWhy() TerminatedReason
func (*Terminated) ProtoMessage ¶
func (*Terminated) ProtoMessage()
func (*Terminated) ProtoReflect ¶
func (x *Terminated) ProtoReflect() protoreflect.Message
func (*Terminated) Reset ¶
func (x *Terminated) Reset()
func (*Terminated) String ¶
func (x *Terminated) String() string
func (*Terminated) SystemMessage ¶
func (*Terminated) SystemMessage()
SystemMessage marks Terminated as a system message.
type TerminatedReason ¶
type TerminatedReason int32
const ( TerminatedReason_Stopped TerminatedReason = 0 TerminatedReason_AddressTerminated TerminatedReason = 1 TerminatedReason_NotFound TerminatedReason = 2 )
func (TerminatedReason) Descriptor ¶
func (TerminatedReason) Descriptor() protoreflect.EnumDescriptor
func (TerminatedReason) Enum ¶
func (x TerminatedReason) Enum() *TerminatedReason
func (TerminatedReason) EnumDescriptor
deprecated
func (TerminatedReason) EnumDescriptor() ([]byte, []int)
Deprecated: Use TerminatedReason.Descriptor instead.
func (TerminatedReason) Number ¶
func (x TerminatedReason) Number() protoreflect.EnumNumber
func (TerminatedReason) String ¶
func (x TerminatedReason) String() string
func (TerminatedReason) Type ¶
func (TerminatedReason) Type() protoreflect.EnumType
type Touch ¶
type Touch struct {
// contains filtered or unexported fields
}
func (*Touch) Descriptor
deprecated
func (*Touch) GetAutoResponse ¶
GetAutoResponse returns the auto-response for a Touch message.
func (*Touch) ProtoMessage ¶
func (*Touch) ProtoMessage()
func (*Touch) ProtoReflect ¶
func (x *Touch) ProtoReflect() protoreflect.Message
type Touched ¶
type Touched struct {
Who *PID `protobuf:"bytes,1,opt,name=who,proto3" json:"who,omitempty"`
// contains filtered or unexported fields
}
func (*Touched) Descriptor
deprecated
func (*Touched) ProtoMessage ¶
func (*Touched) ProtoMessage()
func (*Touched) ProtoReflect ¶
func (x *Touched) ProtoReflect() protoreflect.Message
type Unwatch ¶
type Unwatch struct {
Watcher *PID `protobuf:"bytes,1,opt,name=Watcher,proto3" json:"Watcher,omitempty"`
// contains filtered or unexported fields
}
func (*Unwatch) Descriptor
deprecated
func (*Unwatch) GetWatcher ¶
func (*Unwatch) ProtoMessage ¶
func (*Unwatch) ProtoMessage()
func (*Unwatch) ProtoReflect ¶
func (x *Unwatch) ProtoReflect() protoreflect.Message
func (*Unwatch) SystemMessage ¶
func (*Unwatch) SystemMessage()
SystemMessage marks Unwatch as a system message.
type Watch ¶
type Watch struct {
Watcher *PID `protobuf:"bytes,1,opt,name=Watcher,proto3" json:"Watcher,omitempty"`
// contains filtered or unexported fields
}
system messages
func (*Watch) Descriptor
deprecated
func (*Watch) GetWatcher ¶
func (*Watch) ProtoMessage ¶
func (*Watch) ProtoMessage()
func (*Watch) ProtoReflect ¶
func (x *Watch) ProtoReflect() protoreflect.Message
func (*Watch) SystemMessage ¶
func (*Watch) SystemMessage()
SystemMessage marks Watch as a system message.
Source Files
¶
- actor.pb.go
- actor_context.go
- actor_process.go
- actor_system.go
- auto_respond.go
- behavior.go
- bounded.go
- captured_context.go
- child_restart_stats.go
- config.go
- config_opts.go
- context.go
- deadletter.go
- deduplication_context.go
- directive.go
- directive_string.go
- dispatcher.go
- doc.go
- eventstream_process.go
- future.go
- guardian.go
- mailbox.go
- message.go
- message_batch.go
- message_envelope.go
- messages.go
- messagetype.go
- metrics.go
- middleware_chain.go
- pid.go
- pidset.go
- priority_queue.go
- process.go
- process_registry.go
- props.go
- props_opts.go
- queue.go
- ring_buffer.go
- root_context.go
- stash.go
- strategy_all_for_one.go
- strategy_exponential_backoff.go
- strategy_one_for_one.go
- strategy_restarting.go
- supervision.go
- supervision_event.go
- supervision_metrics.go
- throttler.go
- unbounded.go
- unbounded_lock_free.go
- unbounded_priority.go
Directories
¶
| Path | Synopsis |
|---|---|
|
Package middleware provides reusable actor middleware components.
|
Package middleware provides reusable actor middleware components. |
|
propagator
Package propagator offers utilities for forwarding middleware and decorators.
|
Package propagator offers utilities for forwarding middleware and decorators. |
|
protozip
Package protozip demonstrates middleware utilities for propagating Zipkin tracing headers through Proto.Actor messages.
|
Package protozip demonstrates middleware utilities for propagating Zipkin tracing headers through Proto.Actor messages. |
|
opentelemetry
module
|