actors

package
v0.0.0-...-6291ac6 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Feb 23, 2023 License: MIT Imports: 26 Imported by: 0

Documentation

Index

Constants

This section is empty.

Variables

View Source
var (
	ErrNameRequired     = errors.New("actor system is required")
	ErrNodeAddrRequired = errors.New("actor system node address is required")
)
View Source
var (
	ErrNotReady       = errors.New("actor is not ready")
	ErrUnhandled      = errors.New("unhandled message")
	ErrMissingConfig  = errors.New("config is missing")
	ErrUndefinedActor = errors.New("actor is not defined")
	ErrRequestTimeout = errors.New("request timed out")
	ErrEmptyBehavior  = errors.New("no behavior defined")

	ErrRemoteSendInvalidActorSystem = status.Error(codes.FailedPrecondition, "invalid actor system")
	ErrRemoteSendInvalidNode        = status.Error(codes.FailedPrecondition, "invalid actor system node")
	ErrRemoteActorNotFound          = func(addr string) error { return status.Errorf(codes.NotFound, "remote actor=%s not found", addr) }
	ErrRemoteSendFailure            = func(err error) error { return status.Error(codes.Internal, err.Error()) }
)
View Source
var RemoteNoSender = new(pb.Address)

RemoteNoSender means that there is no sender

Functions

func RemoteLookup

func RemoteLookup(ctx context.Context, host string, port int, name string) (addr *pb.Address, err error)

RemoteLookup look for an actor address on a remote node.

func RemoteSendAsync

func RemoteSendAsync(ctx context.Context, to *pb.Address, message proto.Message) error

RemoteSendAsync sends a message to an actor remotely without expecting any reply

func RemoteSendSync

func RemoteSendSync(ctx context.Context, to *pb.Address, message proto.Message) (response proto.Message, err error)

RemoteSendSync sends a synchronous message to another actor remotely and expect a response.

func SendAsync

func SendAsync(ctx context.Context, to PID, message proto.Message) error

SendAsync sends an asynchronous message to an actor

func SendSync

func SendSync(ctx context.Context, to PID, message proto.Message, timeout time.Duration) (response proto.Message, err error)

SendSync sends a synchronous message to another actor and expect a response. This block until a response is received or timed out.

Types

type Actor

type Actor interface {
	// PreStart pre-starts the actor. This function can be used to set up some database connections
	// or some sort of initialization before the actor start processing messages
	PreStart(ctx context.Context) error
	// Receive processes any message dropped into the actor mailbox.
	// The receiver of any message can either reply to the sender of the message with a new message or reply to the message synchronously
	// by setting the reply of the message. The latter approach is often used when an external service is communicating to the actor.
	// One thing to know is that actor can communicate synchronously as well, just that will hinder the performance of the system.
	Receive(ctx ReceiveContext)
	// PostStop is executed when the actor is shutting down.
	// The execution happens when every messages that have not been processed yet will be processed before the actor shutdowns
	PostStop(ctx context.Context) error
}

Actor represents the Actor interface This will be implemented by any user who wants to create an actor

type ActorSystem

type ActorSystem interface {
	// Name returns the actor system name
	Name() string
	// NodeAddr returns the node where the actor system is running
	NodeAddr() string
	// Actors returns the list of Actors that are alive in the actor system
	Actors() []PID
	// Host returns the actor system host address
	Host() string
	// Port returns the actor system host port
	Port() int
	// Start starts the actor system
	Start(ctx context.Context) error
	// Stop stops the actor system
	Stop(ctx context.Context) error
	// StartActor creates an actor in the system and starts it
	StartActor(ctx context.Context, name string, actor Actor) PID
	// StopActor stops a given actor in the system
	StopActor(ctx context.Context, name string) error
	// RestartActor restarts a given actor in the system
	RestartActor(ctx context.Context, name string) (PID, error)
	// NumActors returns the total number of active actors in the system
	NumActors() uint64
	// contains filtered or unexported methods
}

ActorSystem defines the contract of an actor system

func NewActorSystem

func NewActorSystem(config *Config) (ActorSystem, error)

NewActorSystem creates an instance of ActorSystem

type Address

type Address struct {
	// contains filtered or unexported fields
}

Address represents the physical location under which an Actor can be reached. Examples are local addresses, identified by the ActorSystem’s name, and remote addresses, identified by protocol, host and port.

func NewAddress

func NewAddress(protocol string, system string, host string, port int) *Address

NewAddress creates an instance of Address

func (*Address) Host

func (a *Address) Host() string

Host returns the host

func (*Address) HostPort

func (a *Address) HostPort() string

HostPort returns the host and port in the following string format @host:port

func (*Address) IsLocal

func (a *Address) IsLocal() bool

IsLocal returns true if this Address is only defined locally. It is not safe to send locally scoped addresses to remote

func (*Address) IsRemote

func (a *Address) IsRemote() bool

IsRemote true if this Address is usable globally. Unlike locally defined addresses of global scope are safe to sent to other hosts, as they globally and uniquely identify an addressable entity.

func (*Address) Parse

func (a *Address) Parse(address string) *Address

Parse parses a new Address from a given string

func (*Address) Port

func (a *Address) Port() int

Port returns the port number

func (*Address) Protocol

func (a *Address) Protocol() string

Protocol returns the protocol

func (*Address) String

func (a *Address) String() string

String returns the canonical String representation of this Address formatted as: `protocol://system@host:port`

func (*Address) System

func (a *Address) System() string

System returns the actor system name

func (*Address) WithHost

func (a *Address) WithHost(host string) *Address

WithHost sets the hosts of a given Address and returns a new instance of the address

func (*Address) WithPort

func (a *Address) WithPort(port int) *Address

WithPort sets the port of a given Address and returns a new instance of the address

func (*Address) WithProtocol

func (a *Address) WithProtocol(protocol string) *Address

WithProtocol sets the protocol of a given Address and returns a new instance of the address

func (*Address) WithSystem

func (a *Address) WithSystem(system string) *Address

WithSystem sets the actor system of a given Address and returns a new instance of the address

type Behavior

type Behavior func(ctx ReceiveContext)

Behavior defines an actor behavior

type BehaviorStack

type BehaviorStack []Behavior

BehaviorStack defines a stack of Behavior

func NewBehaviorStack

func NewBehaviorStack() BehaviorStack

NewBehaviorStack creates an instance of BehaviorStack

func (*BehaviorStack) Clear

func (b *BehaviorStack) Clear()

Clear empty the stack

func (*BehaviorStack) IsEmpty

func (b *BehaviorStack) IsEmpty() bool

IsEmpty checks if stack is empty

func (*BehaviorStack) Len

func (b *BehaviorStack) Len() int

Len returns the length of the stack.

func (*BehaviorStack) Peek

func (b *BehaviorStack) Peek() (behavior Behavior, ok bool)

Peek helps view the top item on the stack

func (*BehaviorStack) Pop

func (b *BehaviorStack) Pop() (behavior Behavior, ok bool)

Pop removes and return top element of stack. Return false if stack is empty.

func (*BehaviorStack) Push

func (b *BehaviorStack) Push(behavior Behavior)

Push a new value onto the stack

type Config

type Config struct {
	// contains filtered or unexported fields
}

Config represents the actor system configuration

func NewConfig

func NewConfig(name, nodeHostAndPort string, options ...Option) (*Config, error)

NewConfig creates an instance of Config

func (Config) ActorInitMaxRetries

func (c Config) ActorInitMaxRetries() int

ActorInitMaxRetries returns the actor init max retries

func (Config) ExpireActorAfter

func (c Config) ExpireActorAfter() time.Duration

ExpireActorAfter returns the expireActorAfter

func (Config) HostAndPort

func (c Config) HostAndPort() (host string, port int)

HostAndPort returns the host and the port

func (Config) Logger

func (c Config) Logger() log.Logger

Logger returns the logger

func (Config) Name

func (c Config) Name() string

Name returns the actor system name

func (Config) NodeHostAndPort

func (c Config) NodeHostAndPort() string

NodeHostAndPort returns the node host and port

func (Config) ReplyTimeout

func (c Config) ReplyTimeout() time.Duration

ReplyTimeout returns the reply timeout

type Option

type Option interface {
	// Apply sets the Option value of a config.
	Apply(config *Config)
}

Option is the interface that applies a configuration option.

func WithActorInitMaxRetries

func WithActorInitMaxRetries(max int) Option

WithActorInitMaxRetries sets the number of times to retry an actor init process

func WithExpireActorAfter

func WithExpireActorAfter(duration time.Duration) Option

WithExpireActorAfter sets the actor expiry duration. After such duration an idle actor will be expired and removed from the actor system

func WithLogger

func WithLogger(logger log.Logger) Option

WithLogger sets the actor system custom logger

func WithPassivationDisabled

func WithPassivationDisabled() Option

WithPassivationDisabled disable the passivation mode

func WithRemoting

func WithRemoting() Option

WithRemoting enables remoting on the actor system

func WithReplyTimeout

func WithReplyTimeout(timeout time.Duration) Option

WithReplyTimeout sets how long in seconds an actor should reply a command in a receive-reply pattern

func WithSupervisorStrategy

func WithSupervisorStrategy(strategy pb.StrategyDirective) Option

WithSupervisorStrategy sets the supervisor strategy

func WithTelemetry

func WithTelemetry(telemetry *telemetry.Telemetry) Option

WithTelemetry sets the custom telemetry

type OptionFunc

type OptionFunc func(*Config)

OptionFunc implements the Option interface.

func (OptionFunc) Apply

func (f OptionFunc) Apply(c *Config)

type PID

type PID interface {
	// Shutdown gracefully shuts down the given actor
	Shutdown(ctx context.Context) error
	// IsOnline returns true when the actor is online ready to process messages and false
	// when the actor is stopped or not started at all
	IsOnline() bool
	// ReceivedCount returns the total number of messages processed by the actor
	// at a given point in time while the actor heart is still beating
	ReceivedCount(ctx context.Context) uint64
	// ErrorsCount returns the total number of panic attacks that occur while the actor is processing messages
	// at a given point in time while the actor heart is still beating
	ErrorsCount(ctx context.Context) uint64
	// SpawnChild creates a child actor
	SpawnChild(ctx context.Context, name string, actor Actor) (PID, error)
	// Restart restarts the actor
	Restart(ctx context.Context) error
	// Watch an actor
	Watch(pid PID)
	// UnWatch stops watching a given actor
	UnWatch(pid PID)
	// ActorSystem returns the underlying actor system
	ActorSystem() ActorSystem
	// ActorPath returns the path of the actor
	ActorPath() *Path
	// Tell sends an asynchronous message to another PID
	Tell(ctx context.Context, to PID, message proto.Message) error
	// Ask sends a synchronous message to another actor and expect a response.
	Ask(ctx context.Context, to PID, message proto.Message) (response proto.Message, err error)
	// RemoteTell sends a message to an actor remotely without expecting any reply
	// The receiver can reply to the sender using the Sender of the RemoteMessage
	RemoteTell(ctx context.Context, to *pb.Address, message proto.Message) error
	// RemoteAsk sends a synchronous message to another actor remotely and expect a response.
	// This is one-way communication between two actors remotely
	RemoteAsk(ctx context.Context, to *pb.Address, message proto.Message) (response proto.Message, err error)
	// RemoteLookup look for an actor address on a remote node. If the actorSystem is nil then the lookup will be done
	// using the same actor system as the PID actor system
	RemoteLookup(ctx context.Context, host string, port int, name string, actorSystem *string) (addr *pb.Address, err error)
	// RestartCount returns the number of times the actor has restarted
	RestartCount(ctx context.Context) uint64
	// MailboxSize returns the mailbox size a given time
	MailboxSize(ctx context.Context) uint64
	// Children returns the list of all the children of the given actor
	Children(ctx context.Context) []PID
	// contains filtered or unexported methods
}

PID defines the various actions one can perform on a given actor

var NoSender PID

NoSender means that there is no sender

type Path

type Path struct {
	// contains filtered or unexported fields
}

Path is a unique path to an actor

func NewPath

func NewPath(name string, address *Address) *Path

NewPath creates an immutable Path

func (*Path) Address

func (a *Path) Address() *Address

Address return the actor path address

func (*Path) ID

func (a *Path) ID() uuid.UUID

ID returns the internal unique id of the actor that this path refer to.

func (*Path) Name

func (a *Path) Name() string

Name returns the name of the actor that this path refers to.

func (*Path) Parent

func (a *Path) Parent() *Path

Parent returns the parent path

func (*Path) RemoteAddress

func (a *Path) RemoteAddress() *pb.Address

RemoteAddress returns the remote from path

func (*Path) String

func (a *Path) String() string

String returns the string representation of an actorPath

func (*Path) WithParent

func (a *Path) WithParent(parent *Path) *Path

WithParent sets the parent actor path and returns a new path This function is immutable

type ReceiveContext

type ReceiveContext interface {
	// Context returns the context attached to the message
	Context() context.Context
	// Sender of the message
	Sender() PID
	// Self represents the actor receiving the message.
	Self() PID
	// Message is the actual message sent
	Message() proto.Message
	// Response sets the message response
	// Use this method within the Actor.Receive method of the actor to sets a reply
	// This can only be used when we are request-response pattern. When it is an async commnunication
	// this operation will amount to nothing.
	Response(resp proto.Message)
	// Become switch the current behavior of the actor to a new behavior
	// The current message in process during the transition will still be processed with the current
	// behavior before the transition. However, subsequent messages will be processed with the new behavior.
	// One needs to call UnBecome to reset the actor behavior to the default one which is the Actor.Receive method
	// which is the default behavior.
	Become(behavior Behavior)
	// UnBecome reset the actor behavior to the default one which is the
	// Actor.Receive method
	UnBecome()
	// BecomeStacked sets a new behavior to the actor.
	// The current message in process during the transition will still be processed with the current
	// behavior before the transition. However, subsequent messages will be processed with the new behavior.
	// One needs to call UnBecomeStacked to go the previous the actor's behavior.
	// which is the default behavior.
	BecomeStacked(behavior Behavior)
	// UnBecomeStacked sets the actor behavior to the previous behavior before BecomeStacked was called
	UnBecomeStacked()
}

ReceiveContext is the context that is used by the actor to receive messages

type Unit

type Unit struct{}

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL