quacktors

package module
v0.0.6 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Mar 23, 2021 License: MIT Imports: 21 Imported by: 0

README

logo

quacktors

Github Action Go Reference Go Report

quacktors or "quick actors" is a Go framework that brings Erlang/Elixir style concurrency to Go! It allows for message passing, actor monitoring and can even deal with remote actors/systems. Furthermore, it comes with plenty of useful standard modules for building actor model systems (like Supervisors, Relays, etc.). Oh and btw: quacktors is super easy to use!

rootCtx := quacktors.RootContext()

pid := quacktors.Spawn(func(ctx *quacktors.Context, message quacktors.Message) {
    fmt.Println("Hello, quacktors!")
})

rootCtx.Send(pid, quacktors.EmptyMessage{})

Getting started

To get started, you'll need an installation of qpmd (see: qpmd). The quacktor port mapper daemon is responsible for keeping track of all running systems and quacktor instances on your local machine and acts as a "DNS server" for remote machines that want to connect to a local system.

import "github.com/Azer0s/quacktors"

foo := quacktors.NewSystem("foo")

pid := quacktors.Spawn(func(ctx *quacktors.Context, message quacktors.Message) {
    switch m := message.(type) {
    case quacktors.GenericMessage:
        fmt.Println(m.Value)
    }
})

foo.HandleRemote("printer", pid)

quacktors.Run()
rootCtx := quacktors.RootContext()

node := quacktors.Connect("foo@localhost")
printer, ok := node.Remote("printer")

rootCtx.Send(printer, quacktors.GenericMessage{Value: "Hello, world"})

Custom messages

To be able to send and receive messages from remote actors, you have to register your custom messages with quacktors. If you don't need to send a message to a remote machine, you also don't need to register it.

Note: the Type method is used to identify your message across machines (i.e. your message names have to match between machines). The recommended way of naming your types is to use a, sort of, package structure (e.g. "mypackage/MyMessage"). These can then be versioned by appending @{version} afterwards (e.g. "mypackage/MyMessage@v1 could reference the MyMessageV1 struct).

package mypackage

type MyMessage struct {
    Foo string
    Bar float32
}

func (m MyMessage) Type() string {
    return "mypackage/MyMessage"
}

Since GenServer handler names are resolved via Type, GenServers cut the package prefix and append the version if there is any. So "mypackage/MyMessage@v1" could be referenced in a cast handler with HandleMyMessageV1Cast (note: letters in the version name are automatically turned to upper case).

Monitoring actors

quacktors can monitor both local, as well as remote actors. As soon as the monitored actor goes down, a DownMessage is sent out to the monitoring actor.

pid := quacktors.Spawn(func(ctx *quacktors.Context, message quacktors.Message) {
})

quacktors.SpawnWithInit(func(ctx *quacktors.Context) {
    ctx.Monitor(pid)
}, func(ctx *quacktors.Context, message quacktors.Message) {
    switch m := message.(type) {
        case quacktors.DownMessage:
            ctx.Logger.Info("received down message from other actor", 
                "pid", m.String())
            ctx.Quit()
    }
})

quacktors.Run()

Tracing

quacktors supports opentracing out of the box! It's as easy as setting the global tracer (and optionally providing a span to the root context).

cfg := jaegercfg.Configuration{
    ServiceName: "TestNewSystemWithHandler",
    Sampler: &jaegercfg.SamplerConfig{
        Type:  jaeger.SamplerTypeConst,
        Param: 1,
    },
    Reporter: &jaegercfg.ReporterConfig{
        LogSpans: true,
    },
}
tracer, closer, _ := cfg.NewTracer()
defer closer.Close()

opentracing.SetGlobalTracer(tracer)

span := opentracing.GlobalTracer().StartSpan("root")
defer span.Finish()
rootCtx := quacktors.RootContextWithSpan(span)

a1 := quacktors.SpawnWithInit(func(ctx *quacktors.Context) {
    ctx.Trace("a1")
}, func(ctx *quacktors.Context, message quacktors.Message) {
    ctx.Span().SetTag("message_type", message.Type())
    <-time.After(3 * time.Second)
})

rootCtx.Send(a1, quacktors.EmptyMessage{})

quacktors.Run()

Metrics

quacktors has a metric system in place (not the 📏 kind, the 📊 one) and offers many useful components to collect and metrics (like the TimedRecorder and the accompanying TimedRecorderHook to make collecting metrics in a specified interval super easy).

Supervision

quacktors comes with some cool standard components, one of which is the supervisor. The supervisor (as the name implies) supervises one or many named actors and reacts to failures according to a set strategy.

quacktors.SpawnStateful(component.Supervisor(component.ALL_FOR_ONE_STRATEGY, map[string]Actor{
    "1": &superImportantActor{id: 1},
    "2": &superImportantActor{id: 2},
    "3": &superImportantActor{id: 3},
    "4": &superImportantActor{id: 4},
}))

Location transparency

Sending messages in quacktors is completely location transparent, meaning no more worrying about connections, marshalling, unmarshalling, error handling and all that other boring stuff. Just send what you want to whoever you want to send it to. It's that easy.

Floating PIDs

PIDs in quacktors are floating, meaning you can send a PID to a remote machine as a message and use that same PID there as you would use any other PID.

foo := quacktors.NewSystem("foo")

ping := quacktors.Spawn(func(ctx *quacktors.Context, message quacktors.Message) {
    switch m := message.(type) {
    case quacktors.Pid:
        ctx.Logger.Info("ping")
        <- time.After(1 * time.Second)
        ctx.Send(&m, *ctx.Self())
    }
})

foo.HandleRemote("ping", ping)

quacktors.Run()
rootCtx := quacktors.RootContext()

bar := quacktors.NewSystem("bar")

pong := quacktors.Spawn(func(ctx *quacktors.Context, message quacktors.Message) {
    switch m := message.(type) {
    case quacktors.Pid:
        ctx.Logger.Info("pong")
        <- time.After(1 * time.Second)
        ctx.Send(&m, *ctx.Self())
    }
})

bar.HandleRemote("pong", pong)

foo := quacktors.Connect("foo@localhost")
ping := foo.Remote("ping")

rootCtx.Send(ping, *pong)

quacktors.Run()

GenServers

As part of the default component set, quacktors supports Elixir style GenServers. The handlers for these are configured via the method names via reflection. So a GenServer with a Call handler for a PrintRequest would look like so:

type PrintRequest struct {
    //our printing request message
}

func (p PrintRequest) Type() string {
    return "PrintRequest"
}

type Printer struct { 
    //printing magic
}

func (p Printer) InitGenServer(ctx *quacktors.Context) {
	ctx.Trace("printer")
}

func (p Printer) HandlePrintRequestCall(ctx *quacktors.Context, message PrintRequest) Message {
    //print stuff
	
    return quacktors.EmptyMessage{}
}


pid := quacktors.SpawnStateful(genserver.New(Printer{}))
res, err := genserver.Call(pid, PrintRequest{})

So you don't even have to write your own actors if you don't want to. Cool, isn't it?

Quacktor streams

quacktors supports stream processing out of the box. Currently, there is only a connector for Apache Kafka but many more will come in the future.

context := quacktors.RootContext()

consumer, _ := quacktorstreams.NewConsumer(consumerImpl)
producer := quacktorstreams.NewProducer(producerImpl, "test")

pid := quacktors.Spawn(func(ctx *quacktors.Context, message quacktors.Message) {
    fmt.Println(message)
})

_ = consumer.Subscribe("test", pid, func(bytes []byte) (quacktors.Message, error) {
    val := quacktors.GenericMessage{}
    err := json.Unmarshal(bytes, &val)
    return val, err
})

context.Send(producer, quacktors.GenericMessage{Value: 1})
context.Send(producer, quacktors.GenericMessage{Value: 2})

quacktors.Run()

On message order and reception

In quacktors, message order is guaranteed from one actor to another. Meaning that if you send messages from A to B, they will arrive in order. The same is true for remote actors.

For multiple actors (A, B & C send messages to D), we can't make that guarantee because we don't know when each actor will execute.

As with basically all other actor systems, there is no guarantee (or even acknowledgement) that a message has been received. Send is a non-blocking call and doesn't return anything (even if the sending procedure failed).

On PID logging

When starting quacktors for the first time, you might notice that sometimes quacktors logs with a global PID (i.e. PID + machine ID) and sometimes just a local PID is logged. This is because sometimes there is ambiguity as to where (on which machine) a PID lives (e.g. when telling a PID to quit) and other times it's clear that the PID is on the local system (e.g. when starting an actor). Global actor PIDs are named gpid when logging. When we know that a PID lives on a remote machine, we don't only log the gpid but also the machineId.

Configuring quacktors

quacktors has some configuration options which can be set by using the config package during init.

func init() {
    config.SetLogger(&MyCustomLogger{})
    config.SetQpmdPort(7777)
}

Documentation

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

func MachineId

func MachineId() string

MachineId returns the local machine id.

func RegisterType

func RegisterType(message Message)

RegisterType registers a Message to the type store so it can be sent to remote machines (which, of course, need a Message with the same Message.Type registered).

func Run added in v0.0.5

func Run()

Run waits until all actors have quit.

Types

type Abortable

type Abortable interface {
	//The Abort function aborts the underlying task (e.g. a Monitor) when called.
	Abort()
}

The Abortable interface defines the methods a struct has to implement so it can be returned by an action that can be canceled. It is very similar to context.Context with the key difference that an Abortable can only Abort and doesn't carry any further details about the underlying action.

type Actor

type Actor interface {
	//Init is called when an Actor is initialized. It is
	//guaranteed to be called before an Actor has been registered
	//or even started. Typically, Init is used to start monitors
	//to other actors or do some setup work. The caller
	//function provides a Context to the Init function.
	//Context can be used to interact with other actors
	//(e.g. send, monitor, etc) or modify the current Actor
	//(e.g. quit, defer actions, etc).
	Init(ctx *Context)

	//Run is called when an Actor receives a Message. The caller
	//function provides both a Context as well as the actual
	//Message to the Run function. Context can then be used to
	//interact with other actors (e.g. send, monitor, etc) or
	//modify the current Actor (e.g. quit, defer actions, etc).
	Run(ctx *Context, message Message)
}

The Actor interface defines the methods a struct has to implement so it can be spawned by quacktors.

type Context

type Context struct {
	Logger contextLogger
	// contains filtered or unexported fields
}

The Context struct defines the actor context and provides ways for an actor to interact with the rest of the system. Actors are provided a Context instance on Init and Run. Actors should only use the provided context to interact with other actors as the Context also stores things like current Span or a pointer to the acto specific send mutex.

func RootContext

func RootContext() Context

RootContext returns a context that can be used outside an Actor. It is not associated with a real PID and therefore, one should not call anything with the RootContext that requires it to be able to receive or the like (e.g. no Context.Quit, Context.Monitor, etc.).

func RootContextWithSpan added in v0.0.5

func RootContextWithSpan(span opentracing.Span) Context

RootContextWithSpan is the same as RootContext but with a tracing span attached to it so it will do distributed tracing.

func VectorContext added in v0.0.5

func VectorContext(name string, span opentracing.Span) Context

VectorContext creates a context with a custom name and opentracing.Span. This allows for integrating applications with quacktors.

func (*Context) Defer

func (c *Context) Defer(action func())

Defer defers an action to after an actor has gone down. The same general advice applies to the Defer function as to the built-in Go defer (e.g. avoid defers in for loops, no nil function defers, etc). Deferred actor functions should not panic (because nothing will happen if they do, quacktors just recovers the panic).

func (*Context) Kill

func (c *Context) Kill(pid *Pid)

Kill kills another actor by its PID.

func (*Context) Monitor

func (c *Context) Monitor(pid *Pid) Abortable

Monitor starts a monitor on another actor. As soon as the actor goes down, a DownMessage is sent to the monitoring actor. Monitor also returns an Abortable so the monitor can be canceled (i.e. no DownMessage will be sent out if the monitored actor goes down).

func (*Context) MonitorMachine

func (c *Context) MonitorMachine(machine *Machine) Abortable

MonitorMachine starts a monitor on a connection to a remote machine. As soon as the remote disconnects, a DisconnectMessage is sent to the monitoring actor. MonitorMachine also returns an Abortable so the monitor can be canceled (i.e. no DisconnectMessage will be sent out if the monitored actor goes down).

func (*Context) PassthroughPoisonPill added in v0.0.6

func (c *Context) PassthroughPoisonPill(val bool)

PassthroughPoisonPill enables message passthrough for PoisonPill messages. If set to true, PoisonPill messages will not shut down the actor but be forwarded to the handler function.

func (*Context) Quit

func (c *Context) Quit()

Quit kills the calling actor.

func (*Context) Self

func (c *Context) Self() *Pid

Self returns the PID of the calling actor.

func (*Context) Send

func (c *Context) Send(to *Pid, message Message)

Send sends a Message to another actor by its PID.

func (*Context) SendAfter

func (c *Context) SendAfter(to *Pid, message Message, duration time.Duration) Abortable

SendAfter schedules a Message to be sent to another actor by its PID after a timer has finished. SendAfter also returns an Abortable so the scheduled Send can be stopped. If the sending actor goes down before the timer has completed, the Send operation is still executed.

func (*Context) Span added in v0.0.5

func (c *Context) Span() opentracing.Span

Span returns the current opentracing.Span. This will always be nil unless Trace was called with a service name in the Init function of the actor.

func (*Context) Trace added in v0.0.5

func (c *Context) Trace(name string)

Trace enables distributed tracing for the actor (quacktors will create a ChildSpan with the operationName set to the provided name).

func (*Context) TraceFork added in v0.0.5

func (c *Context) TraceFork(traceFork func(ctx opentracing.SpanContext) opentracing.SpanReference)

TraceFork sets the default fork mechanism for incoming SpanContexts. By default, this is set to opentracing.FollowsFrom.

type DisconnectMessage

type DisconnectMessage struct {
	//MachineId is the ID of the Machine that disconnected.
	MachineId string
	//Address is the remote address of the Machine.
	Address string
}

The DisconnectMessage is sent to a monitoring Actor whenever a monitored Machine connection goes down.

func (DisconnectMessage) Type

func (d DisconnectMessage) Type() string

Type of DisconnectMessage returns "DisconnectMessage"

type DownMessage

type DownMessage struct {
	//Who is the PID of the Actor that went down.
	Who *Pid
}

The DownMessage is sent to a monitoring Actor whenever a monitored Actor goes down.

func (DownMessage) Type

func (d DownMessage) Type() string

Type of DownMessage returns "DownMessage"

type EmptyMessage

type EmptyMessage struct {
}

The EmptyMessage struct is an empty message without any semantic meaning (i.e. it literally doesn't do anything).

func (EmptyMessage) Type

func (e EmptyMessage) Type() string

Type of EmptyMessage returns "EmptyMessage"

type GenericMessage

type GenericMessage struct {
	//Value is the value a GenericMessage carries.
	Value interface{} `json:"value"`
}

The GenericMessage carries a single Value of type interface{}.

func (GenericMessage) Type

func (g GenericMessage) Type() string

Type of GenericMessage returns "GenericMessage"

type KillMessage

type KillMessage struct {
}

A KillMessage can be sent to an Actor to ask it to shut down. It is entirely semantic, meaning this will not shut the Actor down automatically. Instead, the Actor should clean up whatever it is doing gracefully and then call Context.Quit itself.

func (KillMessage) Type

func (k KillMessage) Type() string

Type of KillMessage returns "KillMessage"

type Machine

type Machine struct {
	MachineId          string
	Address            string
	MessageGatewayPort uint16

	GeneralPurposePort uint16
	// contains filtered or unexported fields
}

Machine is the struct representation of a remote machine.

type Message

type Message interface {
	//Type returns the the "name" of the Message.
	//This is used to identify and correctly unmarshal
	//messages from remote actors.
	Type() string
}

The Message interface defines all methods a struct has to implement so it can be sent around by actors.

type Pid

type Pid struct {
	MachineId string
	Id        string
	// contains filtered or unexported fields
}

The Pid struct acts as a reference to an Actor. It is completely location transparent, meaning it doesn't matter if the Pid is actually on another system. To the developer it will look like just another Actor they can send messages to.

func Spawn

func Spawn(action func(ctx *Context, message Message)) *Pid

Spawn spawns an Actor from an anonymous receive function and returns the *Pid of the Actor.

func SpawnStateful

func SpawnStateful(actor Actor) *Pid

SpawnStateful spawns an Actor.

func SpawnWithInit

func SpawnWithInit(init func(ctx *Context), action func(ctx *Context, message Message)) *Pid

SpawnWithInit spawns an Actor from an anonymous receive function and an anonymous init function and returns the *Pid of the Actor.

func (*Pid) Is

func (pid *Pid) Is(other *Pid) bool

Is compares two PIDs and returns true if their ID and MachineId are the same.

func (*Pid) String

func (pid *Pid) String() string

func (Pid) Type

func (pid Pid) Type() string

Type returns the Message type of the PID. Since PIDs can be sent around without any message wrapper, it needs to implement the Message interface (which is why Type is needed).

type PoisonPill

type PoisonPill struct {
}

A PoisonPill can be sent to an Actor to kill it without aborting current (or already queued) messages. Instead, it is enqueued into the actors mailbox and when the Actor gets to the PoisonPill Message, gracefully shuts it down. A PoisonPill never gets passed on to the Run function. Instead, it just calls Context.Quit on the current actors Context.

func (PoisonPill) Type

func (p PoisonPill) Type() string

Type of PoisonPill returns "PoisonPill"

type RemoteSystem

type RemoteSystem struct {
	MachineId string
	Address   string
	Port      uint16
	Machine   *Machine
}

RemoteSystem is the struct representation of a system (i.e. a collection of PIDs that have been assigned a name) on a remote machine.

func Connect

func Connect(name string) (*RemoteSystem, error)

Connect connects to a remote system and returns a *RemoteSystem and an error (nil if everything went fine). The connection string format should be "system@remote" where "system" is the name of the remote system and "remote" is either an IP or a domain name.

func (*RemoteSystem) Remote

func (r *RemoteSystem) Remote(handlerName string) (*Pid, error)

Remote gets a remote PID by its handler name.

type StatelessActor

type StatelessActor struct {
	InitFunction    func(ctx *Context)
	ReceiveFunction func(ctx *Context, message Message)
}

The StatelessActor struct is the Actor implementation that is used when using Spawn or SpawnWithInit. As the name implies, the StatelessActor doesn't have a state and just requires one anonymous function as the initializer (for Init) and another one as the run function (for Run) to work. ReceiveFunction can be nil, InitFunction has to be set.

func (*StatelessActor) Init

func (s *StatelessActor) Init(ctx *Context)

Init initializes the StatelessActor by calling InitFunction if it is not nil. Init panics if ReceiveFunction is not set.

func (*StatelessActor) Run

func (s *StatelessActor) Run(ctx *Context, message Message)

Run forwards both the Message and the Context to the ReceiveFunction when the StatelessActor receives a message.

type System

type System struct {
	// contains filtered or unexported fields
}

The System struct represents a logical actor system (i.e. a collection of PIDs that have been assigned a handler name so that remote machines can look them up). Furthermore, the System struct also keeps track of the connection to the local qpmd and keeps track of the system server status (the server which is used to look up PIDs by handler names, etc).

func NewSystem

func NewSystem(name string) (*System, error)

NewSystem creates a new system server, connects to qpmd, starts the qpmd heartbeat for the new system and returns a *System and an error (nil if everything went fine).

func (*System) Close

func (s *System) Close()

Close closes the connection to the local qpmd and quits the system server.

func (*System) HandleRemote

func (s *System) HandleRemote(name string, process *Pid)

HandleRemote associates a PID with a handler name.

func (*System) IsClosed

func (s *System) IsClosed() bool

IsClosed returns true if the connection to the local qpmd or the system server were closed.

Directories

Path Synopsis

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL