actor

package
v0.0.0-...-dd34702 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jan 15, 2024 License: MIT Imports: 22 Imported by: 37

README

Developers guide for the actor package.

This provides an overview of the abstractions and concepts used in the actor package.

Engine

The Hollywood engine is the core of the actor model. It is responsible for spawning actors, sending messages to actors and stopping actors. The engine is also responsible for the lifecycle of the actors.

Receiver / Actor

The receiver is the interface that all actors must implement. It is the interface that the engine uses to communicate with the actor. We sometimes refer to actors as receivers.

Child & Parent

An actor can have a parent. If an actor is spawned by another actor, the spawning actor is the parent of the spawned actor. This is typically used to implement supervision or to facilitate logical routing of messages.

Message

The basis of communication between actors is the message. A message can be of any type. If the message needs to cross the wire, it must be serializable, so a protobuf description must be provided.

Context

The context is a struct that is passed to all user-supplied actors. It should contain all the dependencies that the actor needs to do its work. The context is also used to send messages to other actors.

Request

A request is a message that is sent to an actor synchronously. The request will block until the actor has processed the message and returned a response.

Note, that some performance overhead of sending requests. Inside Hollywood a single-use actor is spawned to handle the request. This actor is then stopped when the response is received.

Since requests are synchronous, they can deadlock if the actor that is processing the request sends a request to the actor that sent the original request. So be careful when using requests.

Remoter

The Remoter interface is an interface that is used to break up a circular dependency between the engine and the remote. The engine needs to be able to send messages to the remote, but the remote also needs to be able to send messages to the engine. The Remoter interface is used to break this circular dependency.

Repeater

A repeater is started in the Engine when you'll like to send a message to an actor at a regular interval.

Event & Event Stream

Since Hollywood is asynchronous, a lot of what would typically be returned as errors are instead broadcasted as events. Each Engine has an Event Stream that can be used to listen for events. The most important event is likely the DeadLetter event. This event is broadcasted when a message is sent to an actor that doesn't exist or cannot be reached.

See events.go for a list of events.

Inbox

Each actor has an inbox. The inbox is implemented as a ring buffer, the size of which is configurable when you spawn an actor. Note that when you run out of capacity in the inbox, the inbox will impose backpressure on the sender. This means that the sender will block until there is capacity in the inbox. So sizing the inbox is important.

Tag

Each actor can have an arbitrary number of tags. Tags are used to route messages to actors. You can send broadcast a message to all actors with a specific tag

Registry

The Engine keeps a list of all local actors in a registry, which is a map of actor names to actor references. The registry is used to route messages to actors.

Scheduler

TODO: Describe the scheduler

Envelope

When a message is sent to an actor is is wrapped in an envelope. The envelope contains the message, the sender and the receiver. The envelope is used to route the message to the correct actor.

Process

A process is an abstraction over the actor. Todo: Describe the process.

Processer

Todo: Not really sure.

PID

A PID is a process identifier. It is used to identify a process. An actor is a PID.

Middleware

Middleware is used to intercept messages before they are sent to the actor. This can be used to implement logging, metrics, tracing, etc.

PoisonPill

When an actor needs to be shutdown, a PoisonPill message is sent to the actor, which will shut down the actor.

Documentation

Index

Constants

View Source
const LocalLookupAddr = "local"

Variables

View Source
var (
	ErrInvalidLength        = fmt.Errorf("proto: negative length found during unmarshaling")
	ErrIntOverflow          = fmt.Errorf("proto: integer overflow")
	ErrUnexpectedEndOfGroup = fmt.Errorf("proto: unexpected end of group")
)
View Source
var File_actor_actor_proto protoreflect.FileDescriptor

Functions

This section is empty.

Types

type ActorDuplicateIdEvent

type ActorDuplicateIdEvent struct {
	PID *PID
}

ActorDuplicateIdEvent gets published if we try to register the same name twice.

func (ActorDuplicateIdEvent) Log

func (e ActorDuplicateIdEvent) Log() (slog.Level, string, []any)

type ActorInitializedEvent

type ActorInitializedEvent struct {
	PID       *PID
	Timestamp time.Time
}

ActorInitializedEvent is broadcasted over the eventStream before an actor received and processed its started event.

func (ActorInitializedEvent) Log

func (e ActorInitializedEvent) Log() (slog.Level, string, []any)

type ActorMaxRestartsExceededEvent

type ActorMaxRestartsExceededEvent struct {
	PID       *PID
	Timestamp time.Time
}

ActorMaxRestartsExceededEvent gets created if an actor crashes too many times

func (ActorMaxRestartsExceededEvent) Log

type ActorRestartedEvent

type ActorRestartedEvent struct {
	PID        *PID
	Timestamp  time.Time
	Stacktrace []byte
	Reason     any
	Restarts   int32
}

ActorRestartedEvent is broadcasted when an actor crashes and gets restarted

func (ActorRestartedEvent) Log

func (e ActorRestartedEvent) Log() (slog.Level, string, []any)

type ActorStartedEvent

type ActorStartedEvent struct {
	PID       *PID
	Timestamp time.Time
}

ActorStartedEvent is broadcasted over the eventStream each time a Receiver (Actor) is spawned and activated. This means, that at the point of receiving this event the Receiver (Actor) is ready to process messages.

func (ActorStartedEvent) Log

func (e ActorStartedEvent) Log() (slog.Level, string, []any)

type ActorStoppedEvent

type ActorStoppedEvent struct {
	PID       *PID
	Timestamp time.Time
}

ActorStoppedEvent is broadcasted over the eventStream each time a process is terminated.

func (ActorStoppedEvent) Log

func (e ActorStoppedEvent) Log() (slog.Level, string, []any)

type Context

type Context struct {
	// contains filtered or unexported fields
}

func (*Context) Child

func (c *Context) Child(id string) *PID

Child will return the PID of the child (if any) by the given name/id. PID will be nil if it could not find it.

func (*Context) Children

func (c *Context) Children() []*PID

Children returns all child PIDs for the current process.

func (*Context) Context

func (c *Context) Context() context.Context

Context returns a context.Context, user defined on spawn or a context.Background as default

func (*Context) Engine

func (c *Context) Engine() *Engine

Engine returns a pointer to the underlying Engine.

func (*Context) Forward

func (c *Context) Forward(pid *PID)

Forward will forward the current received message to the given PID. This will also set the "forwarder" as the sender of the message.

func (*Context) GetPID

func (c *Context) GetPID(id string) *PID

GetPID returns the PID of the process found by the given id. Returns nil when it could not find any process..

func (*Context) Message

func (c *Context) Message() any

Message returns the message that is currently being received.

func (*Context) PID

func (c *Context) PID() *PID

PID returns the PID of the process that belongs to the context.

func (*Context) Parent

func (c *Context) Parent() *PID

Parent returns the PID of the process that spawned the current process.

func (*Context) Receiver

func (c *Context) Receiver() Receiver

Receiver returns the underlying receiver of this Context.

func (*Context) Request

func (c *Context) Request(pid *PID, msg any, timeout time.Duration) *Response

See Engine.Request for information. This is just a helper function doing that calls Request on the underlying Engine. c.Engine().Request().

func (*Context) Respond

func (c *Context) Respond(msg any)

Respond will sent the given message to the sender of the current received message.

func (*Context) Send

func (c *Context) Send(pid *PID, msg any)

Send will send the given message to the given PID. This will also set the sender of the message to the PID of the current Context. Hence, the receiver of the message can call Context.Sender() to know the PID of the process that sent this message.

func (*Context) SendRepeat

func (c *Context) SendRepeat(pid *PID, msg any, interval time.Duration) SendRepeater

SendRepeat will send the given message to the given PID each given interval. It will return a SendRepeater struct that can stop the repeating message by calling Stop().

func (*Context) Sender

func (c *Context) Sender() *PID

Sender, when available, returns the PID of the process that sent the current received message.

func (*Context) SpawnChild

func (c *Context) SpawnChild(p Producer, name string, opts ...OptFunc) *PID

SpawnChild will spawn the given Producer as a child of the current Context. If the parent process dies, all the children will be automatically shutdown gracefully. Hence, all children will receive the Stopped message.

func (*Context) SpawnChildFunc

func (c *Context) SpawnChildFunc(f func(*Context), name string, opts ...OptFunc) *PID

SpawnChildFunc spawns the given function as a child Receiver of the current Context.

type DeadLetterEvent

type DeadLetterEvent struct {
	Target  *PID
	Message any
	Sender  *PID
}

DeadLetterEvent is delivered to the deadletter actor when a message can't be delivered to it's recipient

type Engine

type Engine struct {
	Registry *Registry
	// contains filtered or unexported fields
}

Engine represents the actor engine.

func NewEngine

func NewEngine(config EngineConfig) (*Engine, error)

NewEngine returns a new actor Engine given an EngineConfig.

func (*Engine) Address

func (e *Engine) Address() string

Address returns the address of the actor engine. When there is no remote configured, the "local" address will be used, otherwise the listen address of the remote.

func (*Engine) BroadcastEvent

func (e *Engine) BroadcastEvent(msg any)

BroadcastEvent will broadcast the given message over the eventstream, notifying all actors that are subscribed.

func (*Engine) Poison

func (e *Engine) Poison(pid *PID, wg ...*sync.WaitGroup) *sync.WaitGroup

Poison will send a graceful poisonPill message to the process that is associated with the given PID. The process will shut down gracefully once it has processed all the messages in the inbox. If given a WaitGroup, it blocks till the process is completely shutdown.

func (*Engine) Request

func (e *Engine) Request(pid *PID, msg any, timeout time.Duration) *Response

Request sends the given message to the given PID as a "Request", returning a response that will resolve in the future. Calling Response.Result() will block until the deadline is exceeded or the response is being resolved.

func (*Engine) Send

func (e *Engine) Send(pid *PID, msg any)

Send sends the given message to the given PID. If the message cannot be delivered due to the fact that the given process is not registered. The message will be sent to the DeadLetter process instead.

func (*Engine) SendLocal

func (e *Engine) SendLocal(pid *PID, msg any, sender *PID)

SendLocal will send the given message to the given PID. If the recipient is not found in the registry, the message will be sent to the DeadLetter process instead. If there is no deadletter process registered, the function will panic.

func (*Engine) SendRepeat

func (e *Engine) SendRepeat(pid *PID, msg any, interval time.Duration) SendRepeater

SendRepeat will send the given message to the given PID each given interval. It will return a SendRepeater struct that can stop the repeating message by calling Stop().

func (*Engine) SendWithSender

func (e *Engine) SendWithSender(pid *PID, msg any, sender *PID)

SendWithSender will send the given message to the given PID with the given sender. Receivers receiving this message can check the sender by calling Context.Sender().

func (*Engine) Spawn

func (e *Engine) Spawn(p Producer, kind string, opts ...OptFunc) *PID

Spawn spawns a process that will producer by the given Producer and can be configured with the given opts.

func (*Engine) SpawnFunc

func (e *Engine) SpawnFunc(f func(*Context), kind string, opts ...OptFunc) *PID

SpawnFunc spawns the given function as a stateless receiver/actor.

func (*Engine) SpawnProc

func (e *Engine) SpawnProc(p Processer) *PID

SpawnProc spawns the give Processer. This function is useful when working with custom created Processes. Take a look at the streamWriter as an example.

func (*Engine) Stop

func (e *Engine) Stop(pid *PID, wg ...*sync.WaitGroup) *sync.WaitGroup

Stop will send a non-graceful poisonPill message to the process that is associated with the given PID. The process will shut down immediately, once it has processed the poisonPill messsage.

func (*Engine) Subscribe

func (e *Engine) Subscribe(pid *PID)

Subscribe will subscribe the given PID to the event stream.

func (*Engine) Unsubscribe

func (e *Engine) Unsubscribe(pid *PID)

Unsubscribe will un subscribe the given PID from the event stream.

type EngineConfig

type EngineConfig struct {
	// contains filtered or unexported fields
}

EngineConfig holds the configuration of the engine.

func NewEngineConfig

func NewEngineConfig() EngineConfig

NewEngineConfig returns a new default EngineConfig.

func (EngineConfig) WithRemote

func (config EngineConfig) WithRemote(remote Remoter) EngineConfig

WithRemote sets the remote which will configure the engine so its capable to send and receive messages over the network.

type EngineRemoteMissingEvent

type EngineRemoteMissingEvent struct {
	Target  *PID
	Sender  *PID
	Message any
}

EngineRemoteMissingEvent gets published if we try to send a message to a remote actor but the remote system is not available.

func (EngineRemoteMissingEvent) Log

type Envelope

type Envelope struct {
	Msg    any
	Sender *PID
}

type EventLogger

type EventLogger interface {
	Log() (slog.Level, string, []any)
}

EventLogger is an interface that the various Events can choose to implement. If they do, the event stream will log these events to slog.

type Inbox

type Inbox struct {
	// contains filtered or unexported fields
}

func NewInbox

func NewInbox(size int) *Inbox

func (*Inbox) Send

func (in *Inbox) Send(msg Envelope)

func (*Inbox) Start

func (in *Inbox) Start(proc Processer)

func (*Inbox) Stop

func (in *Inbox) Stop() error

type Inboxer

type Inboxer interface {
	Send(Envelope)
	Start(Processer)
	Stop() error
}

type Initialized

type Initialized struct{}

type InternalError

type InternalError struct {
	From string
	Err  error
}

type MiddlewareFunc

type MiddlewareFunc = func(ReceiveFunc) ReceiveFunc

type OptFunc

type OptFunc func(*Opts)

func WithContext

func WithContext(ctx context.Context) OptFunc

func WithID

func WithID(id string) OptFunc

func WithInboxSize

func WithInboxSize(size int) OptFunc

func WithMaxRestarts

func WithMaxRestarts(n int) OptFunc

func WithMiddleware

func WithMiddleware(mw ...MiddlewareFunc) OptFunc

func WithRestartDelay

func WithRestartDelay(d time.Duration) OptFunc

type Opts

type Opts struct {
	Producer     Producer
	Kind         string
	ID           string
	MaxRestarts  int32
	RestartDelay time.Duration
	InboxSize    int
	Middleware   []MiddlewareFunc
	Context      context.Context
}

func DefaultOpts

func DefaultOpts(p Producer) Opts

DefaultOpts returns default options from the given Producer.

type PID

type PID struct {
	Address string `protobuf:"bytes,1,opt,name=address,proto3" json:"address,omitempty"`
	ID      string `protobuf:"bytes,2,opt,name=ID,proto3" json:"ID,omitempty"`
	// contains filtered or unexported fields
}

func NewPID

func NewPID(address, id string) *PID

NewPID returns a new Process ID given an address and an id.

func (*PID) Child

func (pid *PID) Child(id string) *PID

func (*PID) CloneMessageVT

func (m *PID) CloneMessageVT() proto.Message

func (*PID) CloneVT

func (m *PID) CloneVT() *PID

func (*PID) Descriptor deprecated

func (*PID) Descriptor() ([]byte, []int)

Deprecated: Use PID.ProtoReflect.Descriptor instead.

func (*PID) EqualMessageVT

func (this *PID) EqualMessageVT(thatMsg proto.Message) bool

func (*PID) EqualVT

func (this *PID) EqualVT(that *PID) bool

func (*PID) Equals

func (pid *PID) Equals(other *PID) bool

func (*PID) GetAddress

func (x *PID) GetAddress() string

func (*PID) GetID

func (x *PID) GetID() string

func (*PID) LookupKey

func (pid *PID) LookupKey() uint64

func (*PID) MarshalToSizedBufferVT

func (m *PID) MarshalToSizedBufferVT(dAtA []byte) (int, error)

func (*PID) MarshalToSizedBufferVTStrict

func (m *PID) MarshalToSizedBufferVTStrict(dAtA []byte) (int, error)

func (*PID) MarshalToVT

func (m *PID) MarshalToVT(dAtA []byte) (int, error)

func (*PID) MarshalToVTStrict

func (m *PID) MarshalToVTStrict(dAtA []byte) (int, error)

func (*PID) MarshalVT

func (m *PID) MarshalVT() (dAtA []byte, err error)

func (*PID) MarshalVTStrict

func (m *PID) MarshalVTStrict() (dAtA []byte, err error)

func (*PID) ProtoMessage

func (*PID) ProtoMessage()

func (*PID) ProtoReflect

func (x *PID) ProtoReflect() protoreflect.Message

func (*PID) Reset

func (x *PID) Reset()

func (*PID) SizeVT

func (m *PID) SizeVT() (n int)

func (*PID) String

func (pid *PID) String() string

func (*PID) UnmarshalVT

func (m *PID) UnmarshalVT(dAtA []byte) error

type Ping

type Ping struct {
	From *PID `protobuf:"bytes,1,opt,name=from,proto3" json:"from,omitempty"`
	// contains filtered or unexported fields
}

func (*Ping) CloneMessageVT

func (m *Ping) CloneMessageVT() proto.Message

func (*Ping) CloneVT

func (m *Ping) CloneVT() *Ping

func (*Ping) Descriptor deprecated

func (*Ping) Descriptor() ([]byte, []int)

Deprecated: Use Ping.ProtoReflect.Descriptor instead.

func (*Ping) EqualMessageVT

func (this *Ping) EqualMessageVT(thatMsg proto.Message) bool

func (*Ping) EqualVT

func (this *Ping) EqualVT(that *Ping) bool

func (*Ping) GetFrom

func (x *Ping) GetFrom() *PID

func (*Ping) MarshalToSizedBufferVT

func (m *Ping) MarshalToSizedBufferVT(dAtA []byte) (int, error)

func (*Ping) MarshalToSizedBufferVTStrict

func (m *Ping) MarshalToSizedBufferVTStrict(dAtA []byte) (int, error)

func (*Ping) MarshalToVT

func (m *Ping) MarshalToVT(dAtA []byte) (int, error)

func (*Ping) MarshalToVTStrict

func (m *Ping) MarshalToVTStrict(dAtA []byte) (int, error)

func (*Ping) MarshalVT

func (m *Ping) MarshalVT() (dAtA []byte, err error)

func (*Ping) MarshalVTStrict

func (m *Ping) MarshalVTStrict() (dAtA []byte, err error)

func (*Ping) ProtoMessage

func (*Ping) ProtoMessage()

func (*Ping) ProtoReflect

func (x *Ping) ProtoReflect() protoreflect.Message

func (*Ping) Reset

func (x *Ping) Reset()

func (*Ping) SizeVT

func (m *Ping) SizeVT() (n int)

func (*Ping) String

func (x *Ping) String() string

func (*Ping) UnmarshalVT

func (m *Ping) UnmarshalVT(dAtA []byte) error

type Pong

type Pong struct {
	From *PID `protobuf:"bytes,1,opt,name=from,proto3" json:"from,omitempty"`
	// contains filtered or unexported fields
}

func (*Pong) CloneMessageVT

func (m *Pong) CloneMessageVT() proto.Message

func (*Pong) CloneVT

func (m *Pong) CloneVT() *Pong

func (*Pong) Descriptor deprecated

func (*Pong) Descriptor() ([]byte, []int)

Deprecated: Use Pong.ProtoReflect.Descriptor instead.

func (*Pong) EqualMessageVT

func (this *Pong) EqualMessageVT(thatMsg proto.Message) bool

func (*Pong) EqualVT

func (this *Pong) EqualVT(that *Pong) bool

func (*Pong) GetFrom

func (x *Pong) GetFrom() *PID

func (*Pong) MarshalToSizedBufferVT

func (m *Pong) MarshalToSizedBufferVT(dAtA []byte) (int, error)

func (*Pong) MarshalToSizedBufferVTStrict

func (m *Pong) MarshalToSizedBufferVTStrict(dAtA []byte) (int, error)

func (*Pong) MarshalToVT

func (m *Pong) MarshalToVT(dAtA []byte) (int, error)

func (*Pong) MarshalToVTStrict

func (m *Pong) MarshalToVTStrict(dAtA []byte) (int, error)

func (*Pong) MarshalVT

func (m *Pong) MarshalVT() (dAtA []byte, err error)

func (*Pong) MarshalVTStrict

func (m *Pong) MarshalVTStrict() (dAtA []byte, err error)

func (*Pong) ProtoMessage

func (*Pong) ProtoMessage()

func (*Pong) ProtoReflect

func (x *Pong) ProtoReflect() protoreflect.Message

func (*Pong) Reset

func (x *Pong) Reset()

func (*Pong) SizeVT

func (m *Pong) SizeVT() (n int)

func (*Pong) String

func (x *Pong) String() string

func (*Pong) UnmarshalVT

func (m *Pong) UnmarshalVT(dAtA []byte) error

type Processer

type Processer interface {
	Start()
	PID() *PID
	Send(*PID, any, *PID)
	Invoke([]Envelope)
	Shutdown(*sync.WaitGroup)
}

Processer is an interface the abstracts the way a process behaves.

type Producer

type Producer func() Receiver

Producer is any function that can return a Receiver

type ReceiveFunc

type ReceiveFunc = func(*Context)

type Receiver

type Receiver interface {
	Receive(*Context)
}

Receiver is an interface that can receive and process messages.

type Registry

type Registry struct {
	// contains filtered or unexported fields
}

func (*Registry) GetPID

func (r *Registry) GetPID(kind, id string) *PID

GetPID returns the process id associated for the given kind and its id. GetPID returns nil if the process was not found.

func (*Registry) Remove

func (r *Registry) Remove(pid *PID)

Remove removes the given PID from the registry.

type RemoteUnreachableEvent

type RemoteUnreachableEvent struct {
	// The listen address of the remote we are trying to dial.
	ListenAddr string
}

RemoteUnreachableEvent gets published when trying to send a message to an remote that is not reachable. The event will be published after we retry to dial it N times.

type Remoter

type Remoter interface {
	Address() string
	Send(*PID, any, *PID)
	Start(*Engine) error
	Stop() *sync.WaitGroup
}

Remoter is an interface that abstract a remote that is tied to an engine.

type Response

type Response struct {
	// contains filtered or unexported fields
}

func NewResponse

func NewResponse(e *Engine, timeout time.Duration) *Response

func (*Response) Invoke

func (r *Response) Invoke([]Envelope)

func (*Response) PID

func (r *Response) PID() *PID

func (*Response) Result

func (r *Response) Result() (any, error)

func (*Response) Send

func (r *Response) Send(_ *PID, msg any, _ *PID)

func (*Response) Shutdown

func (r *Response) Shutdown(_ *sync.WaitGroup)

func (*Response) Start

func (r *Response) Start()

type Scheduler

type Scheduler interface {
	Schedule(fn func())
	Throughput() int
}

func NewScheduler

func NewScheduler(throughput int) Scheduler

type SendRepeater

type SendRepeater struct {
	// contains filtered or unexported fields
}

SendRepeater is a struct that can be used to send a repeating message to a given PID. If you need to have an actor wake up periodically, you can use a SendRepeater. It is started by the SendRepeat method and stopped by it's Stop() method.

func (SendRepeater) Stop

func (sr SendRepeater) Stop()

Stop will stop the repeating message.

type Started

type Started struct{}

type Stopped

type Stopped struct{}

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL