Documentation ¶
Index ¶
- Variables
- func RemoteLookup(ctx context.Context, host string, port int, name string) (addr *pb.Address, err error)
- func RemoteSendAsync(ctx context.Context, to *pb.Address, message proto.Message) error
- func RemoteSendSync(ctx context.Context, to *pb.Address, message proto.Message) (response proto.Message, err error)
- func SendAsync(ctx context.Context, to PID, message proto.Message) error
- func SendSync(ctx context.Context, to PID, message proto.Message, timeout time.Duration) (response proto.Message, err error)
- type Actor
- type ActorSystem
- type Behavior
- type BehaviorStack
- type Config
- type LocalAddress
- func (a *LocalAddress) Host() string
- func (a *LocalAddress) HostPort() string
- func (a *LocalAddress) Parse(address string) *LocalAddress
- func (a *LocalAddress) Port() int
- func (a *LocalAddress) Protocol() string
- func (a *LocalAddress) String() string
- func (a *LocalAddress) System() string
- func (a *LocalAddress) WithHost(host string) *LocalAddress
- func (a *LocalAddress) WithPort(port int) *LocalAddress
- func (a *LocalAddress) WithProtocol(protocol string) *LocalAddress
- func (a *LocalAddress) WithSystem(system string) *LocalAddress
- type Option
- func WithActorInitMaxRetries(max int) Option
- func WithExpireActorAfter(duration time.Duration) Option
- func WithLogger(logger log.Logger) Option
- func WithPassivationDisabled() Option
- func WithRemoting() Option
- func WithReplyTimeout(timeout time.Duration) Option
- func WithSupervisorStrategy(strategy StrategyDirective) Option
- func WithTelemetry(telemetry *telemetry.Telemetry) Option
- type OptionFunc
- type PID
- type Path
- type ReceiveContext
- type Reflection
- type StrategyDirective
- type TypesLoader
- type Unit
Constants ¶
This section is empty.
Variables ¶
var ( ErrNameRequired = errors.New("actor system is required") ErrNodeAddrRequired = errors.New("actor system node address is required") )
var ( ErrInvalidActorSystemName = errors.New("invalid ActorSystem name, must contain only word characters (i.e. [a-zA-Z0-9] plus non-leading '-' or '_')") ErrNotReady = errors.New("actor is not ready") ErrUnhandled = errors.New("unhandled message") ErrMissingConfig = errors.New("config is missing") ErrUndefinedActor = errors.New("actor is not defined") ErrRequestTimeout = errors.New("request timed out") ErrEmptyBehavior = errors.New("no behavior defined") ErrRemoteSendInvalidActorSystem = status.Error(codes.FailedPrecondition, "invalid actor system") // nolint ErrRemoteSendInvalidNode = status.Error(codes.FailedPrecondition, "invalid actor system node") ErrRemoteActorNotFound = func(addr string) error { return status.Errorf(codes.NotFound, "remote actor=%s not found", addr) } ErrRemoteSendFailure = func(err error) error { return status.Error(codes.Internal, err.Error()) } ErrInstanceNotAnActor = errors.New("failed to create instance. Reason: instance does not implement the Actor interface") ErrInvalidInstance = errors.New("failed to create instance. Reason: invalid instance") ErrTypeNotFound = func(typeName string) error { return fmt.Errorf("typeName=%s not found", typeName) } )
var RemoteNoSender = new(pb.Address)
RemoteNoSender means that there is no sender
Functions ¶
func RemoteLookup ¶
func RemoteLookup(ctx context.Context, host string, port int, name string) (addr *pb.Address, err error)
RemoteLookup look for an actor address on a remote node.
func RemoteSendAsync ¶
RemoteSendAsync sends a message to an actor remotely without expecting any reply
func RemoteSendSync ¶
func RemoteSendSync(ctx context.Context, to *pb.Address, message proto.Message) (response proto.Message, err error)
RemoteSendSync sends a synchronous message to another actor remotely and expect a response.
Types ¶
type Actor ¶
type Actor interface { // PreStart pre-starts the actor. This function can be used to set up some database connections // or some sort of initialization before the actor start processing public PreStart(ctx context.Context) error // Receive processes any message dropped into the actor mailbox. // The receiver of any message can either reply to the sender of the message with a new message or reply to the message synchronously // by config the reply of the message. The latter approach is often used when an external service is communicating to the actor. // One thing to know is that actor can communicate synchronously as well, just that will hinder the performance of the system. Receive(ctx ReceiveContext) // PostStop is executed when the actor is shutting down. // The execution happens when every public that have not been processed yet will be processed before the actor shutdowns PostStop(ctx context.Context) error }
Actor represents the Actor interface This will be implemented by any user who wants to create an actor
type ActorSystem ¶
type ActorSystem interface { // Name returns the actor system name Name() string // NodeAddr returns the node where the actor system is running NodeAddr() string // Actors returns the list of Actors that are alive in the actor system Actors() []PID // Host returns the actor system host address Host() string // Port returns the actor system host port Port() int // Start starts the actor system Start(ctx context.Context) error // Stop stops the actor system Stop(ctx context.Context) error // StartActor creates an actor in the system and starts it StartActor(ctx context.Context, name string, actor Actor) PID // StopActor stops a given actor in the system StopActor(ctx context.Context, name string) error // RestartActor restarts a given actor in the system RestartActor(ctx context.Context, name string) (PID, error) // NumActors returns the total number of active actors in the system NumActors() uint64 // contains filtered or unexported methods }
ActorSystem defines the contract of an actor system
func NewActorSystem ¶
func NewActorSystem(config *Config) (ActorSystem, error)
NewActorSystem creates an instance of ActorSystem
type BehaviorStack ¶
type BehaviorStack []Behavior
BehaviorStack defines a stack of Behavior
func NewBehaviorStack ¶
func NewBehaviorStack() BehaviorStack
NewBehaviorStack creates an instance of BehaviorStack
func (*BehaviorStack) IsEmpty ¶
func (b *BehaviorStack) IsEmpty() bool
IsEmpty checks if stack is empty
func (*BehaviorStack) Peek ¶
func (b *BehaviorStack) Peek() (behavior Behavior, ok bool)
Peek helps view the top item on the stack
func (*BehaviorStack) Pop ¶
func (b *BehaviorStack) Pop() (behavior Behavior, ok bool)
Pop removes and return top element of stack. Return false if stack is empty.
func (*BehaviorStack) Push ¶
func (b *BehaviorStack) Push(behavior Behavior)
Push a new value onto the stack
type Config ¶
type Config struct {
// contains filtered or unexported fields
}
Config represents the actor system configuration
func (Config) ActorInitMaxRetries ¶
ActorInitMaxRetries returns the actor init max retries
func (Config) ExpireActorAfter ¶
ExpireActorAfter returns the expireActorAfter
func (Config) HostAndPort ¶
HostAndPort returns the host and the port
func (Config) NodeHostAndPort ¶
NodeHostAndPort returns the node host and port
func (Config) ReplyTimeout ¶
ReplyTimeout returns the reply timeout
type LocalAddress ¶
type LocalAddress struct {
// contains filtered or unexported fields
}
LocalAddress represents the physical location under which an Actor can be reached. Examples are local addresses, identified by the ActorSystem’s name, and remote addresses, identified by protocol, host and port.
func NewLocalAddress ¶
func NewLocalAddress(protocol string, system string, host string, port int) *LocalAddress
NewLocalAddress creates an instance of LocalAddress
func (*LocalAddress) HostPort ¶
func (a *LocalAddress) HostPort() string
HostPort returns the host and port in the following string format @host:port
func (*LocalAddress) Parse ¶
func (a *LocalAddress) Parse(address string) *LocalAddress
Parse parses a new LocalAddress from a given string
func (*LocalAddress) Protocol ¶
func (a *LocalAddress) Protocol() string
Protocol returns the protocol
func (*LocalAddress) String ¶
func (a *LocalAddress) String() string
String returns the canonical String representation of this LocalAddress formatted as: `protocol://system@host:port`
func (*LocalAddress) System ¶
func (a *LocalAddress) System() string
System returns the actor system name
func (*LocalAddress) WithHost ¶
func (a *LocalAddress) WithHost(host string) *LocalAddress
WithHost sets the hosts of a given LocalAddress and returns a new instance of the address
func (*LocalAddress) WithPort ¶
func (a *LocalAddress) WithPort(port int) *LocalAddress
WithPort sets the port of a given LocalAddress and returns a new instance of the address
func (*LocalAddress) WithProtocol ¶
func (a *LocalAddress) WithProtocol(protocol string) *LocalAddress
WithProtocol sets the protocol of a given LocalAddress and returns a new instance of the address
func (*LocalAddress) WithSystem ¶
func (a *LocalAddress) WithSystem(system string) *LocalAddress
WithSystem sets the actor system of a given LocalAddress and returns a new instance of the address
type Option ¶
type Option interface { // Apply sets the Option value of a config. Apply(config *Config) }
Option is the interface that applies a configuration option.
func WithActorInitMaxRetries ¶
WithActorInitMaxRetries sets the number of times to retry an actor init process
func WithExpireActorAfter ¶
WithExpireActorAfter sets the actor expiry duration. After such duration an idle actor will be expired and removed from the actor system
func WithLogger ¶
WithLogger sets the actor system custom logger
func WithPassivationDisabled ¶
func WithPassivationDisabled() Option
WithPassivationDisabled disable the passivation mode
func WithReplyTimeout ¶
WithReplyTimeout sets how long in seconds an actor should reply a command in a receive-reply pattern
func WithSupervisorStrategy ¶
func WithSupervisorStrategy(strategy StrategyDirective) Option
WithSupervisorStrategy sets the supervisor strategy
func WithTelemetry ¶
WithTelemetry sets the custom telemetry
type OptionFunc ¶
type OptionFunc func(*Config)
OptionFunc implements the Option interface.
func (OptionFunc) Apply ¶
func (f OptionFunc) Apply(c *Config)
type PID ¶
type PID interface { // Shutdown gracefully shuts down the given actor Shutdown(ctx context.Context) error // IsOnline returns true when the actor is online ready to process public and false // when the actor is stopped or not started at all IsOnline() bool // ReceivedCount returns the total number of public processed by the actor // at a given point in time while the actor heart is still beating ReceivedCount(ctx context.Context) uint64 // ErrorsCount returns the total number of panic attacks that occur while the actor is processing public // at a given point in time while the actor heart is still beating ErrorsCount(ctx context.Context) uint64 // SpawnChild creates a child actor SpawnChild(ctx context.Context, name string, actor Actor) (PID, error) // Restart restarts the actor Restart(ctx context.Context) error // Watch an actor Watch(pid PID) // UnWatch stops watching a given actor UnWatch(pid PID) // ActorSystem returns the underlying actor system ActorSystem() ActorSystem // ActorPath returns the path of the actor ActorPath() *Path // Tell sends an asynchronous message to another PID Tell(ctx context.Context, to PID, message proto.Message) error // Ask sends a synchronous message to another actor and expect a response. Ask(ctx context.Context, to PID, message proto.Message) (response proto.Message, err error) // RemoteTell sends a message to an actor remotely without expecting any reply RemoteTell(ctx context.Context, to *pb.Address, message proto.Message) error // RemoteAsk is used to send a message to an actor remotely and expect a response // immediately. With this type of message the receiver cannot communicate back to Sender // except reply the message with a response. This one-way communication. RemoteAsk(ctx context.Context, to *pb.Address, message proto.Message) (response proto.Message, err error) // RemoteLookup look for an actor address on a remote node. If the actorSystem is nil then the lookup will be done // using the same actor system as the PID actor system RemoteLookup(ctx context.Context, host string, port int, name string) (addr *pb.Address, err error) // RestartCount returns the number of times the actor has restarted RestartCount(ctx context.Context) uint64 // MailboxSize returns the mailbox size a given time MailboxSize(ctx context.Context) uint64 // Children returns the list of all the children of the given actor Children(ctx context.Context) []PID // contains filtered or unexported methods }
PID defines the various actions one can perform on a given actor
var NoSender PID
NoSender means that there is no sender
type Path ¶
type Path struct {
// contains filtered or unexported fields
}
Path is a unique path to an actor
func NewPath ¶
func NewPath(name string, address *LocalAddress) *Path
NewPath creates an immutable Path
func (*Path) LocalAddress ¶
func (a *Path) LocalAddress() *LocalAddress
LocalAddress return the actor path local address
func (*Path) RemoteAddress ¶
RemoteAddress returns the remote from path
func (*Path) WithParent ¶
WithParent sets the parent actor path and returns a new path This function is immutable
type ReceiveContext ¶
type ReceiveContext interface { // Context returns the context attached to the message Context() context.Context // Sender of the message Sender() PID // Self represents the actor receiving the message. Self() PID // Message is the actual message sent Message() proto.Message // Response sets the message response // Use this method within the Actor.Receive method of the actor to sets a reply // This can only be used when we are request-response pattern. When it is an async commnunication // this operation will amount to nothing. Response(resp proto.Message) // Become switch the current behavior of the actor to a new behavior // The current message in process during the transition will still be processed with the current // behavior before the transition. However, subsequent public will be processed with the new behavior. // One needs to call UnBecome to reset the actor behavior to the default one which is the Actor.Receive method // which is the default behavior. Become(behavior Behavior) // UnBecome reset the actor behavior to the default one which is the // Actor.Receive method UnBecome() // BecomeStacked sets a new behavior to the actor. // The current message in process during the transition will still be processed with the current // behavior before the transition. However, subsequent public will be processed with the new behavior. // One needs to call UnBecomeStacked to go the previous the actor's behavior. // which is the default behavior. BecomeStacked(behavior Behavior) // UnBecomeStacked sets the actor behavior to the previous behavior before BecomeStacked was called UnBecomeStacked() }
ReceiveContext is the context that is used by the actor to receive public
type Reflection ¶
type Reflection interface { // ActorOf creates a new instance of Actor from its concrete type ActorOf(rtype reflect.Type) (actor Actor, err error) // ActorFrom creates a new instance of Actor from its FQN ActorFrom(name string) (actor Actor, err error) }
Reflection helps create an instance dynamically
func NewReflection ¶
func NewReflection(loader TypesLoader) Reflection
NewReflection creates an instance of Reflection
type StrategyDirective ¶
type StrategyDirective int
StrategyDirective represents the supervisor strategy directive
const ( RestartDirective StrategyDirective = iota StopDirective )
type TypesLoader ¶
type TypesLoader interface { // Register an object with its fully qualified name Register(name string, v any) // Type returns the type of object, Type(v any) (reflect.Type, bool) // TypeByName returns the type of object given its name TypeByName(name string) (reflect.Type, bool) // Parent represents the parent typesLoader Parent() TypesLoader }
TypesLoader represents reflection typesLoader for dynamic loading and creation of actors at run-time
func NewTypesLoader ¶
func NewTypesLoader(parent TypesLoader) TypesLoader
NewTypesLoader creates an instance of TypesLoader