Documentation
¶
Index ¶
- Constants
- Variables
- type ActorHook
- type ActorHooks
- type BackPressureClientBase
- type BackPressureClientBaseFactory
- type BackPressureServerBase
- type Client
- type ClientBase
- func (self *ClientBase) OnTermination(fun func(subscription *TerminationSubscription, err error, caughtPanic any)) *TerminationSubscription
- func (self *ClientBase) Send(msg any) (success bool)
- func (self *ClientBase) SendSync(msg MsgSync, waitForReply bool) (success bool)
- func (self *ClientBase) TerminateSync()
- type FiniteStateMachine
- type FiniteStateMachineState
- type ManagerClient
- type ManagerClientBase
- type ManagerServerBase
- type MsgSync
- type MsgSyncBase
- type Server
- type ServerBase
- type TerminationSubscription
Constants ¶
const QuotaSize = 1000
Variables ¶
Use this error as the return value from any actor server-side call-back in order to indicate the actor should terminate in a normal way. I.e. no real error has occurred, the actor just wishes to terminate.
Functions ¶
This section is empty.
Types ¶
type ActorHook ¶
type ActorHook struct { PreReceiveHook func() PostSendHook func() }
type BackPressureClientBase ¶
type BackPressureClientBase struct { *ClientBase // contains filtered or unexported fields }
func (*BackPressureClientBase) Send ¶
func (self *BackPressureClientBase) Send(msg any) (success bool)
Same as ClientBase.Send() - i.e. posts the message to the actor's mailbox. But does additional work to obey the current back-pressure mechanism.
func (*BackPressureClientBase) SendSync ¶
func (self *BackPressureClientBase) SendSync(msg MsgSync, waitForReply bool) (success bool)
Same as ClientBase.SendSync() - i.e. posts a MsgSync message to the actor's mailbox and optionally waits for the server to reply or terminate. But does additional work to obey the current back-pressure mechanism.
type BackPressureClientBaseFactory ¶
type BackPressureClientBaseFactory struct {
// contains filtered or unexported fields
}
BackPressureClientBaseFactory must be used together with BackPressureServerBase. It allocates a quota to each client, which means each client now carries some mutable state. For this reason, the client-side should follow a factory pattern: i.e. the ClientBase that is returned from Spawn() should not be used directly; instead pass it to NewBackPressureClientBaseFactory and call NewClient() on that.
func NewBackPressureClientBaseFactory ¶
func NewBackPressureClientBaseFactory(client *ClientBase) *BackPressureClientBaseFactory
func (*BackPressureClientBaseFactory) NewClient ¶
func (self *BackPressureClientBaseFactory) NewClient() *BackPressureClientBase
type BackPressureServerBase ¶
type BackPressureServerBase struct {
ServerBase
}
This is deliberately a very simple implementation of back pressure: every 1000 sends from each client, we force the client to do a round-trip to the server. If the server is overloaded, this will force clients to block and wait.
It is almost certainly possible to do something more sophisticated, probably with ingress and egress rates, and trying to minimise queue lengths to optimise for latency. But I couldn't make it work, so, KISS.
You can use BackPressureServerBase in place of ServerBase. If you do so, you must also use BackPressureClientBaseFactory in place of ClientBase.
func (*BackPressureServerBase) HandleMsg ¶
func (self *BackPressureServerBase) HandleMsg(msg any) (err error)
For the Server interface.
Understands quota messages.
type Client ¶
type Client interface { // Post a message into the actor's mailbox. Returns true iff it was // possible to add the message to the actor's mailbox. There is no // guarantee that the actor will retrieve and process the message // before it terminates. Send(msg any) (success bool) // Post a synchronous message into the actor's mailbox. A // synchronous message can be waited upon for the message to be // processed by the server-side of the actor. // // If waitForReply is true, then this method will block until: // // a) the mailbox is closed before we can enqueue the message // (the actor has terminated), in which case we return false. // // b) the message is enqueued, but the actor terminates before it // can process the message, in which case we return false. // // c) the message is enqueued, retrieved, processed by the actor, // and the actor marks the processing of the message as being // complete, in which case we return true. // // If waitForReply is false then SendSync will return true iff it // is able to enqueue the message into the mailbox (case (a) // above). It is then up to the caller to invoke msg.WaitForReply() // before it accesses any reply fields in the message. SendSync(msg MsgSync, waitForReply bool) (success bool) // Request the actor terminates. Does not return until the actor // has terminated: the server-side Terminated method must have // finished before this returns. Idempotent. TerminateSync() // Creates a subscription to observe the termination of the actor. // // If the subscription cannot be created (the actor terminated // before the subscription could be registered) then the returned // value will be nil. In this case, it is guaranteed the callback // function will never be invoked. // // If the returned value is non-nil then it is guaranteed the // callback function will be invoked exactly once when the actor // terminates (unless the subscription is cancelled before the // actor terminates). // // If the callback function is invoked, it is invoked in a fresh // go-routine, and does not block the termination of the actor. It // is invoked with the exact same subscription object as the method // returns (which is useful for identification purposes), along // with the error, if any, which caused the actor to terminate. OnTermination(func(subscription *TerminationSubscription, err error, caughtPanic any)) *TerminationSubscription }
Low-level client-side interface to every actor.
type ClientBase ¶
type ClientBase struct {
// contains filtered or unexported fields
}
ClientBase implements the Client interface and provides the basic low-level client-side functionality to send messages to your actor.
func Spawn ¶
Synchronously creates a new actor. The error returned is the error from the new actor's server-side Init method.
func SpawnWithExtraHooks ¶
func SpawnWithExtraHooks(log zerolog.Logger, hooks ActorHooks, server Server, name string) (*ClientBase, error)
func (*ClientBase) OnTermination ¶
func (self *ClientBase) OnTermination(fun func(subscription *TerminationSubscription, err error, caughtPanic any)) *TerminationSubscription
For the Client interface.
func (*ClientBase) Send ¶
func (self *ClientBase) Send(msg any) (success bool)
For the Client interface.
func (*ClientBase) SendSync ¶
func (self *ClientBase) SendSync(msg MsgSync, waitForReply bool) (success bool)
For the Client interface.
func (*ClientBase) TerminateSync ¶
func (self *ClientBase) TerminateSync()
For the Client interface.
type FiniteStateMachine ¶
type FiniteStateMachine struct { Server // contains filtered or unexported fields }
A FiniteStateMachine is an actor server which forwards messages to its current state (a FiniteStateMachineState). The current state can return a nextState. Or, the current state can also explicitly call Become in order to set the next state.
Whenever the state changes, for the new state, Enter is called immediately.
func (*FiniteStateMachine) Become ¶
func (self *FiniteStateMachine) Become(nextState FiniteStateMachineState) (err error)
Sets the FSM's state to be nextState. If nextState is non nil and different to the current state, returns the result of nextState.Enter().
func (*FiniteStateMachine) HandleMsg ¶
func (self *FiniteStateMachine) HandleMsg(msg any) (err error)
For the Server interface.
Calls HandleMsg on the FSM's state. If that handler returns a nil error, returns the result of Become(nextState).
type FiniteStateMachineState ¶
type FiniteStateMachineState interface { // Enter is called immediately as soon as this state becomes the // current state for the FiniteStateMachine. It is only called if // the previous current state was different. Enter() (err error) // HandleMsg serves the same purpose as Server.HandleMsg, only it's // extended here to allow the nextState to be set. You are allowed // to return nil as the nextState, which is interpreted as // no-change. HandleMsg(msg any) (nextState FiniteStateMachineState, err error) }
type ManagerClient ¶
type ManagerClient interface { Client // Spawn a new actor based on the provided Server and name. The new // actor is a child of the manager. Spawn(server Server, name string) (*ClientBase, error) }
Managers exist to support the management of child actors.
• If a child actor terminates with ErrNormalActorTermination (normal termination) then the manager and all its other children continue to work.
• If a child actor terminates for any other reason (abnormal termination) then the manager actor itself terminates.
• Whenever the manager terminates, it makes sure that all its child actors have terminated.
• Because TerminateSync is synchronous, calling TerminateSync on a manager will not return until all its children have also fully terminated too.
type ManagerClientBase ¶
type ManagerClientBase struct {
*ClientBase
}
ManagerClientBase implements the ManagerClient interface.
func (*ManagerClientBase) Spawn ¶
func (self *ManagerClientBase) Spawn(server Server, name string) (*ClientBase, error)
For the ManagerClient interface.
Synchronously spawns a new actor as a child of the manager. If the manager is not terminated then the error returned is the result of the new actor's Init method.
type ManagerServerBase ¶
type ManagerServerBase struct { BackPressureServerBase // The child actors of this manager. Children map[*TerminationSubscription]Client }
ManagerServerBase is the server-side for a manager actor. ManagerClientBase is the client-side.
func (*ManagerServerBase) HandleMsg ¶
func (self *ManagerServerBase) HandleMsg(msg any) (err error)
For the Server interface.
Understands spawn messages, and childTerminated messages.
func (*ManagerServerBase) Init ¶
func (self *ManagerServerBase) Init(log zerolog.Logger, mailboxReader *mailbox.MailboxReader[any], selfClient *ClientBase) (err error)
For the Server interface.
func (*ManagerServerBase) Terminated ¶
func (self *ManagerServerBase) Terminated(err error, caughtPanic any)
For the Server interface.
Ensures all child actors of the manager are terminated. Termination of all child actors happens concurrently, but this method blocks until all child actors have terminated.
type MsgSync ¶
type MsgSync interface { // Used by the actor server-side; returns false iff it was already // marked as processed. // // Once reply fields have been set in the message, call this to // signal to any waiting client that the message has been processed // and that reply values can now be safely read. MarkProcessed() bool // Used by the actor client-side; returns true iff MarkProcessed() has // been called on this msg. Blocks until either MarkProcessed() is // called, or it is known that MarkProcessed() can never be called // (i.e. the server has died before processing the msg). WaitForReply() bool // contains filtered or unexported methods }
Messages that require a response can implement this interface (by embedding MsgSyncBase). That allows the client-side to wait to receive a response, and the server-side to signal when the response has been provided.
The expectation is that message structs that embed MsgSyncBase also include both query and reply fields.
type MsgSyncBase ¶
type MsgSyncBase struct {
// contains filtered or unexported fields
}
Embed MsgSyncBase anonymously within each message which requires a response from an actor server.
func (*MsgSyncBase) MarkProcessed ¶
func (self *MsgSyncBase) MarkProcessed() bool
For the MsgSync interface.
func (*MsgSyncBase) WaitForReply ¶
func (self *MsgSyncBase) WaitForReply() bool
For the MsgSync interface.
type Server ¶
type Server interface { // This is called by the actor's new Go-routine once it's up and // running. If Init() returns a non-nil err, then the actor // terminates, and the error will be returned as the result of // Spawn. Spawn will block until Init() completes. Init(log zerolog.Logger, mailboxReader *mailbox.MailboxReader[any], selfClient *ClientBase) (err error) // Called by the actor's Go-routine for each message received from // its mailbox. If HandleMsg() returns a non-nil error then the // actor terminates. HandleMsg(msg any) (err error) // Called whenever the actor terminates. Terminated(err error, caughtPanic any) Base() *ServerBase }
Low-level server-side interface to every actor.
type ServerBase ¶
type ServerBase struct { Log zerolog.Logger MailboxReader *mailbox.MailboxReader[any] SelfClient *ClientBase // contains filtered or unexported fields }
Embed ServerBase (or BackPressureServerBase) within the struct for the server-side of your actors.
func (*ServerBase) Base ¶
func (self *ServerBase) Base() *ServerBase
For the Server interface.
Provides access to self.
func (*ServerBase) HandleMsg ¶
func (self *ServerBase) HandleMsg(msg any) (err error)
For the Server interface.
Understands termination subscriptions, and terminate messages.
func (*ServerBase) Init ¶
func (self *ServerBase) Init(log zerolog.Logger, mailboxReader *mailbox.MailboxReader[any], selfClient *ClientBase) (err error)
For the Server interface.
Sets the ChanReader and SelfClient fields.
func (*ServerBase) Terminated ¶
func (self *ServerBase) Terminated(err error, caughtPanic any)
For the Server interface.
Fires all termination subscriptions in new go-routines. Does not wait for them to complete. Does not panic or repanic regardless of caughtPanic.
type TerminationSubscription ¶
type TerminationSubscription struct {
// contains filtered or unexported fields
}
func (*TerminationSubscription) Cancel ¶
func (self *TerminationSubscription) Cancel() bool
Cancels the subscription. If this returns true then it is guaranteed the callback function has not been invoked and will not be invoked. If this returns false then it could mean the callback function has already been invoked (or at least started), or it could mean the subscription has already been cancelled. As the callback function is invoked in a fresh go-routine, it is entirely possible for the callback function to be running concurrently with a call to Cancel, in which case, Cancel() can return false before the callback function has run to completion.