actors

package module
v0.0.0-...-d11619d Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jul 18, 2022 License: Apache-2.0 Imports: 7 Imported by: 4

README

Actors

The purpose of this package is to allow you to create actors in Go.

An introduction and tutorial to using this library is available.

Think of an actor like a little server: it is created, it receives messages and processes them, and it terminates. As part of processing a message, it can reply to the message, it can send messages to other actors that it knows of, it can create new actors, and it can terminate.

Each actor has a mailbox. Clients of the actor call the actor's public API, which will typically post messages into this mailbox. The actor will see that there are messages in the mailbox, retrieve each message, and process them. Whilst posting a message into a mailbox is mainly an asynchronous activity, the message itself may require a reply. The sender can choose to block and wait for the reply at any time. This functionality is provided by the MsgSync and MsgSyncBase types.

Each actor-server is a Go-routine. The mailbox is implemented by chans. It is safe for multiple concurrent Go-routines to post messages to the same actor, and it is always the case that an actor-server itself is single-threaded - a single Go-routine. If you wish for more elaborate setups then that is for you: some sort of sharding or session management across a set of actors of the same type is perfectly possible, but is not currently provided directly by this package.

Use different struct types to implement the client-side and the server-side of each actor. The client-side should embed (probably anonymously) an implementation of the Client type. The server-side should anonymously embed an implementation of the Server type (either BackPressureServerBase or ServerBase).

The server-side of each actor must satisfy the Server interface, and so has to provide three methods: Init, HandleMsg, and Terminated. BackPressureServerBase and ServerBase have implementations of all of these, so you need only reimplement the ones you need. For all methods that you implement yourself on the server-side, make certain that you also call the embedded version. If you fail to do this for Init, then various parts of the actor will not be set up properly. If you fail to do this for HandleMsg then the actor will not be able to be terminate, and if you fail to do this for Terminated, then termination subscriptions will not function correctly.

Init is the first method invoked by the new actor, and is called synchronously as part of spawning the new actor. I.e. at the point that Init is invoked (in the new actor's Go-routine), Spawn will not have returned, and no one will yet know of the existence of this new actor. So, if the new actor decides to send messages to itself, it can guarantee that those messages will be the first items in its own mailbox. This is useful for being able to do deferred asynchronous initialisation, for example. If Init fails (returns a non-nil error), then Spawn fails too, and returns the same error.

HandleMsg is invoked for each message that is received from the actor's mailbox. HandleMsg returns an error; if the error is non-nil then the actor terminates: the actor's mailbox is closed, Terminated is invoked, and finally the actor's Go-routine exits. All remaining messages in the actor's mailbox are discarded, and anyone else waiting on responses from the actor will be informed that the actor has terminated and no further responses will be forthcoming.

An introduction and tutorial to using this library is available.

Documentation

Index

Constants

View Source
const QuotaSize = 1000

Variables

View Source
var ErrNormalActorTermination = errors.New("Normal Actor Termination")

Use this error as the return value from any actor server-side call-back in order to indicate the actor should terminate in a normal way. I.e. no real error has occurred, the actor just wishes to terminate.

Functions

This section is empty.

Types

type ActorHook

type ActorHook struct {
	PreReceiveHook func()
	PostSendHook   func()
}

type ActorHooks

type ActorHooks []ActorHook

type BackPressureClientBase

type BackPressureClientBase struct {
	*ClientBase
	// contains filtered or unexported fields
}

func (*BackPressureClientBase) Send

func (self *BackPressureClientBase) Send(msg mailbox.Msg) (success bool)

Same as ClientBase.Send() - i.e. posts the message to the actor's mailbox. But does additional work to obey the current back-pressure mechanism.

func (*BackPressureClientBase) SendSync

func (self *BackPressureClientBase) SendSync(msg MsgSync, waitForReply bool) (success bool)

Same as ClientBase.SendSync() - i.e. posts a MsgSync message to the actor's mailbox and optionally waits for the server to reply or terminate. But does additional work to obey the current back-pressure mechanism.

type BackPressureClientBaseFactory

type BackPressureClientBaseFactory struct {
	// contains filtered or unexported fields
}

BackPressureClientBaseFactory must be used together with BackPressureServerBase. It allocates a quota to each client, which means each client now carries some mutable state. For this reason, the client-side should follow a factory pattern: i.e. the ClientBase that is returned from Spawn() should not be used directly; instead pass it to NewBackPressureClientBaseFactory and call NewClient() on that.

func NewBackPressureClientBaseFactory

func NewBackPressureClientBaseFactory(client *ClientBase) *BackPressureClientBaseFactory

func (*BackPressureClientBaseFactory) NewClient

type BackPressureServerBase

type BackPressureServerBase struct {
	ServerBase
}

This is deliberately a very simple implementation of back pressure: every 1000 sends from each client, we force the client to do a round-trip to the server. If the server is overloaded, this will force clients to block and wait.

It is almost certainly possible to do something more sophisticated, probably with ingress and egress rates, and trying to minimise queue lengths to optimise for latency. But I couldn't make it work, so, KISS.

You can use BackPressureServerBase in place of ServerBase. If you do so, you must also use BackPressureClientBaseFactory in place of ClientBase.

func (*BackPressureServerBase) HandleMsg

func (self *BackPressureServerBase) HandleMsg(msg mailbox.Msg) (err error)

For the Server interface.

Understands quota messages.

type Client

type Client interface {
	// Post a message into the actor's mailbox. Returns true iff it was
	// possible to add the message to the actor's mailbox. There is no
	// guarantee that the actor will retrieve and process the message
	// before it terminates.
	Send(msg mailbox.Msg) (success bool)

	// Post a synchronous message into the actor's mailbox. A
	// synchronous message can be waited upon for the message to be
	// processed by the server-side of the actor.
	//
	// If waitForReply is true, then this method will block until:
	//
	//   a) the mailbox is closed before we can enqueue the message
	//      (the actor has terminated), in which case we return false.
	//
	//   b) the message is enqueued, but the actor terminates before it
	//      can process the message, in which case we return false.
	//
	//   c) the message is enqueued, retrieved, processed by the actor,
	//      and the actor marks the processing of the message as being
	//      complete, in which case we return true.
	//
	// If waitForReply is false then SendSync will return true iff it
	// is able to enqueue the message into the mailbox (case (a)
	// above). It is then up to the caller to invoke msg.WaitForReply()
	// before it accesses any reply fields in the message.
	SendSync(msg MsgSync, waitForReply bool) (success bool)

	// Request the actor terminates. Does not return until the actor
	// has terminated: the server-side Terminated method must have
	// finished before this returns. Idempotent.
	TerminateSync()

	// Creates a subscription to observe the termination of the actor.
	//
	// If the subscription cannot be created (the actor terminated
	// before the subscription could be registered) then the returned
	// value will be nil. In this case, it is guaranteed the callback
	// function will never be invoked.
	//
	// If the returned value is non-nil then it is guaranteed the
	// callback function will be invoked exactly once when the actor
	// terminates (unless the subscription is cancelled before the
	// actor terminates).
	//
	// If the callback function is invoked, it is invoked in a fresh
	// go-routine, and does not block the termination of the actor. It
	// is invoked with the exact same subscription object as the method
	// returns (which is useful for identification purposes), along
	// with the error, if any, which caused the actor to terminate.
	OnTermination(func(subscription *TerminationSubscription, err error, caughtPanic interface{})) *TerminationSubscription
}

Low-level client-side interface to every actor.

type ClientBase

type ClientBase struct {
	// contains filtered or unexported fields
}

ClientBase implements the Client interface and provides the basic low-level client-side functionality to send messages to your actor.

func Spawn

func Spawn(log zerolog.Logger, server Server, name string) (*ClientBase, error)

Synchronously creates a new actor. The error returned is the error from the new actor's server-side Init method.

func SpawnWithExtraHooks

func SpawnWithExtraHooks(log zerolog.Logger, hooks ActorHooks, server Server, name string) (*ClientBase, error)

func (*ClientBase) OnTermination

func (self *ClientBase) OnTermination(fun func(subscription *TerminationSubscription, err error, caughtPanic interface{})) *TerminationSubscription

For the Client interface.

func (*ClientBase) Send

func (self *ClientBase) Send(msg mailbox.Msg) (success bool)

For the Client interface.

func (*ClientBase) SendSync

func (self *ClientBase) SendSync(msg MsgSync, waitForReply bool) (success bool)

For the Client interface.

func (*ClientBase) TerminateSync

func (self *ClientBase) TerminateSync()

For the Client interface.

type FiniteStateMachine

type FiniteStateMachine struct {
	Server
	// contains filtered or unexported fields
}

A FiniteStateMachine is an actor server which forwards messages to its current state (a FiniteStateMachineState). The current state can return a nextState. Or, the current state can also explicitly call Become in order to set the next state.

Whenever the state changes, for the new state, Enter is called immediately.

func (*FiniteStateMachine) Become

func (self *FiniteStateMachine) Become(nextState FiniteStateMachineState) (err error)

Sets the FSM's state to be nextState. If nextState is non nil and different to the current state, returns the result of nextState.Enter().

func (*FiniteStateMachine) HandleMsg

func (self *FiniteStateMachine) HandleMsg(msg mailbox.Msg) (err error)

For the Server interface.

Calls HandleMsg on the FSM's state. If that handler returns a nil error, returns the result of Become(nextState).

type FiniteStateMachineState

type FiniteStateMachineState interface {
	// Enter is called immediately as soon as this state becomes the
	// current state for the FiniteStateMachine. It is only called if
	// the previous current state was different.
	Enter() (err error)

	// HandleMsg serves the same purpose as Server.HandleMsg, only it's
	// extended here to allow the nextState to be set. You are allowed
	// to return nil as the nextState, which is interpreted as
	// no-change.
	HandleMsg(msg mailbox.Msg) (nextState FiniteStateMachineState, err error)
}

type ManagerClient

type ManagerClient interface {
	Client
	// Spawn a new actor based on the provided Server and name. The new
	// actor is a child of the manager.
	Spawn(server Server, name string) (*ClientBase, error)
}

Managers exist to support the management of child actors.

• If a child actor terminates with ErrNormalActorTermination (normal termination) then the manager and all its other children continue to work.

• If a child actor terminates for any other reason (abnormal termination) then the manager actor itself terminates.

• Whenever the manager terminates, it makes sure that all its child actors have terminated.

• Because TerminateSync is synchronous, calling TerminateSync on a manager will not return until all its children have also fully terminated too.

type ManagerClientBase

type ManagerClientBase struct {
	*ClientBase
}

ManagerClientBase implements the ManagerClient interface.

func SpawnManager

func SpawnManager(log zerolog.Logger, name string) (*ManagerClientBase, error)

func (*ManagerClientBase) Spawn

func (self *ManagerClientBase) Spawn(server Server, name string) (*ClientBase, error)

For the ManagerClient interface.

Synchronously spawns a new actor as a child of the manager. If the manager is not terminated then the error returned is the result of the new actor's Init method.

type ManagerServerBase

type ManagerServerBase struct {
	BackPressureServerBase
	// The child actors of this manager.
	Children map[*TerminationSubscription]Client
}

ManagerServerBase is the server-side for a manager actor. ManagerClientBase is the client-side.

func (*ManagerServerBase) HandleMsg

func (self *ManagerServerBase) HandleMsg(msg mailbox.Msg) (err error)

For the Server interface.

Understands spawn messages, and childTerminated messages.

func (*ManagerServerBase) Init

func (self *ManagerServerBase) Init(log zerolog.Logger, mailboxReader *mailbox.MailboxReader, selfClient *ClientBase) (err error)

For the Server interface.

func (*ManagerServerBase) Terminated

func (self *ManagerServerBase) Terminated(err error, caughtPanic interface{})

For the Server interface.

Ensures all child actors of the manager are terminated. Termination of all child actors happens concurrently, but this method blocks until all child actors have terminated.

type MsgSync

type MsgSync interface {
	// Used by the actor server-side; returns false iff it was already
	// marked as processed.
	//
	// Once reply fields have been set in the message, call this to
	// signal to any waiting client that the message has been processed
	// and that reply values can now be safely read.
	MarkProcessed() bool

	// Used by the actor client-side; returns true iff MarkProcessed() has
	// been called on this msg. Blocks until either MarkProcessed() is
	// called, or it is known that MarkProcessed() can never be called
	// (i.e. the server has died before processing the msg).
	WaitForReply() bool
	// contains filtered or unexported methods
}

Messages that require a response can implement this interface (by embedding MsgSyncBase). That allows the client-side to wait to receive a response, and the server-side to signal when the response has been provided.

The expectation is that message structs that embed MsgSyncBase also include both query and reply fields.

type MsgSyncBase

type MsgSyncBase struct {
	// contains filtered or unexported fields
}

Embed MsgSyncBase anonymously within each message which requires a response from an actor server.

func (*MsgSyncBase) MarkProcessed

func (self *MsgSyncBase) MarkProcessed() bool

For the MsgSync interface.

func (*MsgSyncBase) WaitForReply

func (self *MsgSyncBase) WaitForReply() bool

For the MsgSync interface.

type Server

type Server interface {
	// This is called by the actor's new Go-routine once it's up and
	// running. If Init() returns a non-nil err, then the actor
	// terminates, and the error will be returned as the result of
	// Spawn. Spawn will block until Init() completes.
	Init(log zerolog.Logger, mailboxReader *mailbox.MailboxReader, selfClient *ClientBase) (err error)
	// Called by the actor's Go-routine for each message received from
	// its mailbox. If HandleMsg() returns a non-nil error then the
	// actor terminates.
	HandleMsg(msg mailbox.Msg) (err error)
	// Called whenever the actor terminates.
	Terminated(err error, caughtPanic interface{})

	Base() *ServerBase
}

Low-level server-side interface to every actor.

type ServerBase

type ServerBase struct {
	Log           zerolog.Logger
	MailboxReader *mailbox.MailboxReader
	SelfClient    *ClientBase
	// contains filtered or unexported fields
}

Embed ServerBase (or BackPressureServerBase) within the struct for the server-side of your actors.

func (*ServerBase) Base

func (self *ServerBase) Base() *ServerBase

For the Server interface.

Provides access to self.

func (*ServerBase) HandleMsg

func (self *ServerBase) HandleMsg(msg mailbox.Msg) (err error)

For the Server interface.

Understands termination subscriptions, and terminate messages.

func (*ServerBase) Init

func (self *ServerBase) Init(log zerolog.Logger, mailboxReader *mailbox.MailboxReader, selfClient *ClientBase) (err error)

For the Server interface.

Sets the ChanReader and SelfClient fields.

func (*ServerBase) Terminated

func (self *ServerBase) Terminated(err error, caughtPanic interface{})

For the Server interface.

Fires all termination subscriptions in new go-routines. Does not wait for them to complete. Does not panic or repanic regardless of caughtPanic.

type TerminationSubscription

type TerminationSubscription struct {
	// contains filtered or unexported fields
}

func (*TerminationSubscription) Cancel

func (self *TerminationSubscription) Cancel() bool

Cancels the subscription. If this returns true then it is guaranteed the callback function has not been invoked and will not be invoked. If this returns false then it could mean the callback function has already been invoked (or at least started), or it could mean the subscription has already been cancelled. As the callback function is invoked in a fresh go-routine, it is entirely possible for the callback function to be running concurrently with a call to Cancel, in which case, Cancel() can return false before the callback function has run to completion.

Directories

Path Synopsis
Every Mailbox is safe for multiple concurrent writers, and one reader.
Every Mailbox is safe for multiple concurrent writers, and one reader.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL