ergo

package module
v1.2.6 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Aug 9, 2021 License: MIT Imports: 25 Imported by: 0

README

Ergo Framework

GitHub release Go Report Card GoDoc MIT license Build Status

Technologies and design patterns of Erlang/OTP have been proven over the years. Now in Golang. Up to x5 times faster than original Erlang/OTP in terms of network messaging. The easiest drop-in replacement for your hot Erlang-nodes in the cluster.

https://ergo.services

Purpose

The goal of this project is to leverage Erlang/OTP experience with Golang performance. Ergo Framework implements DIST protocol, ETF data format and OTP design patterns (GenServer/Supervisor/Application) which makes you able to create high performance and reliable microservice solutions having native integration with Erlang infrastructure

Features

image

  • Erlang node (run single/multinode)
  • embedded EPMD (in order to get rid of erlang' dependencies)
  • Spawn Erlang-like processes
  • Register/unregister processes with simple atom
  • GenServer behaviour support (with atomic state)
  • Supervisor behaviour support (with all known restart strategies support)
  • Application behaviour support
  • GenStage behaviour support (originated from Elixir's GenStage)
  • Connect to (accept connection from) any Erlang node within a cluster (or clusters, if running as multinode)
  • Making sync request process.Call, async - process.Cast or process.Send in fashion of gen_server:call, gen_server:cast, erlang:send accordingly
  • Monitor processes/nodes
    • local -> local
    • local -> remote
    • remote -> local
  • Link processes
    • local <-> local
    • local <-> remote
    • remote <-> local
  • RPC callbacks support
  • Experimental observer support
  • Unmarshalling terms into the struct using etf.TermIntoStruct, etf.TermMapIntoStruct or etf.TermProplistIntoStruct
  • Support Erlang 22. (including fragmentation feature)
  • Encryption (TLS 1.3) support (including autogenerating self-signed certificates)
  • Tested and confirmed support Windows, Darwin (MacOS), Linux

Requirements

  • Go 1.15.x and above

Changelog

Here are the changes of latest release. For more details see the ChangeLog

1.2.0 - 2021-04-07

  • Added TLS support. Introduced new option TLSmode in ergo.NodeOptions with the following values:

    • ergo.TLSmodeDisabled default value. encryption is disabled
    • ergo.TLSmodeAuto enables encryption with autogenerated and self-signed certificate
    • ergo.TLSmodeStrict enables encryption with specified server/client certificates and keys

    there is example of usage examples/nodetls/tlsGenServer.go

  • Introduced GenStage behaviour implementation (originated from Elixir world). GenStage is an abstraction built on top of GenServer to provide a simple way to create a distributed Producer/Consumer architecture, while automatically managing the concept of backpressure. This implementation is fully compatible with Elixir's GenStage. Example here examples/genstage or just run it go run ./examples/genstage to see it in action

  • Introduced new methods AddStaticRoute/RemoveStaticRoute for Node. This feature allows you to keep EPMD service behind a firewall.

  • Introduced SetTrapExit/GetTrapExit methods for Process in order to control the trapping {'EXIT', from, reason} message

  • Introduced TermMapIntoStruct and TermProplistIntoStruct functions. It should be easy now to transform etf.Map or []eft.ProplistElement into the given struct. See documentation for the details.

  • Improved DIST implementation in order to support KeepAlive messages and get rid of platform-dependent syscall usage

  • Fixed TermIntoStruct function. There was a problem with Tuple value transforming into the given struct

  • Fixed incorrect decoding atoms true, false into the booleans

  • Fixed race condition and freeze of connection serving in corner case #21

  • Fixed problem with monitoring process by the registered name (local and remote)

  • Fixed issue with termination linked processes

  • Fixed platform-dependent issues. Now Ergo Framework has tested and confirmed support of Linux, MacOS, Windows.

Benchmarks

Here is simple EndToEnd test demonstrates performance of messaging subsystem

Hardware: laptop with Intel(R) Core(TM) i5-8265U (4 cores. 8 with HT)

Sequential GenServer.Call using two processes running on single and two nodes

❯❯❯❯ go test -bench=NodeSequential -run=XXX -benchtime=10s
goos: linux
goarch: amd64
pkg: github.com/halturin/ergo
BenchmarkNodeSequential/number-8 	  256108	     48578 ns/op
BenchmarkNodeSequential/string-8 	  266906	     51531 ns/op
BenchmarkNodeSequential/tuple_(PID)-8         	  233700	     58192 ns/op
BenchmarkNodeSequential/binary_1MB-8          	    5617	   2092495 ns/op
BenchmarkNodeSequentialSingleNode/number-8         	 2527580	      4857 ns/op
BenchmarkNodeSequentialSingleNode/string-8         	 2519410	      4760 ns/op
BenchmarkNodeSequentialSingleNode/tuple_(PID)-8    	 2524701	      4757 ns/op
BenchmarkNodeSequentialSingleNode/binary_1MB-8     	 2521370	      4758 ns/op
PASS
ok  	github.com/halturin/ergo	120.720s

it means Ergo Framework provides around 25.000 sync requests per second via localhost for simple data and around 4Gbit/sec for 1MB messages

Parallel GenServer.Call using 120 pairs of processes running on a single and two nodes

❯❯❯❯ go test -bench=NodeParallel -run=XXX -benchtime=10s
goos: linux
goarch: amd64
pkg: github.com/halturin/ergo
BenchmarkNodeParallel-8        	         2652494	      5246 ns/op
BenchmarkNodeParallelSingleNode-8   	 6100352	      2226 ns/op
PASS
ok  	github.com/halturin/ergo	34.145s

these numbers show around 260.000 sync requests per second via localhost using simple data for messaging

vs original Erlang/OTP

benchmarks

sources of these benchmarks are here

EPMD

Ergo Framework has embedded EPMD implementation in order to run your node without external epmd process needs. By default, it works as a client with erlang' epmd daemon or others ergo's nodes either.

The one thing that makes embedded EPMD different is the behaviour of handling connection hangs - if ergo' node is running as an EPMD client and lost connection, it tries either to run its own embedded EPMD service or to restore the lost connection.

As an extra option, we provide EPMD service as a standalone application. There is a simple drop-in replacement of the original Erlang' epmd daemon.

go get -u github.com/halturin/ergo/cmd/epmd

Multinode

This feature allows to create two or more nodes within a single running instance. The only need is to specify the different set of options for creating nodes (such as: node name, empd port number, secret cookie). You may also want to use this feature to create 'proxy'-node between some clusters. See Examples for more details

Observer

It allows you to see the most metrics/information using standard tool of Erlang distribution. The example below shows this feature in action using one of the examples:

observer demo

Examples

Code below is a simple implementation of GenServer pattern examples/simple/GenServer.go

package main

import (
	"fmt"
	"time"

	"github.com/halturin/ergo"
	"github.com/halturin/ergo/etf"
)

type ExampleGenServer struct {
	ergo.GenServer
	process *ergo.Process
}

type State struct {
	value int
}

func (egs *ExampleGenServer) Init(p *ergo.Process, args ...interface{}) (state interface{}) {
	fmt.Printf("Init: args %v \n", args)
	egs.process = p
	InitialState := &State{
		value: args[0].(int), // 100
	}
	return InitialState
}

func (egs *ExampleGenServer) HandleCast(message etf.Term, state interface{}) (string, interface{}) {
	fmt.Printf("HandleCast: %#v (state value %d) \n", message, state.(*State).value)
	time.Sleep(1 * time.Second)
	state.(*State).value++

	if state.(*State).value > 103 {
		egs.process.Send(egs.process.Self(), "hello")
	} else {
		egs.process.Cast(egs.process.Self(), "hi")
	}

	return "noreply", state
}

func (egs *ExampleGenServer) HandleCall(from etf.Tuple, message etf.Term, state interface{}) (string, etf.Term, interface{}) {
	fmt.Printf("HandleCall: %#v, From: %#v\n", message, from)
	return "reply", message, state
}

func (egs *ExampleGenServer) HandleInfo(message etf.Term, state interface{}) (string, interface{}) {
	fmt.Printf("HandleInfo: %#v (state value %d) \n", message, state.(*State).value)
	time.Sleep(1 * time.Second)
	state.(*State).value++
	if state.(*State).value > 106 {
		return "stop", "normal"
	} else {
		egs.process.Send(egs.process.Self(), "hello")
	}
	return "noreply", state
}

func (egs *ExampleGenServer) Terminate(reason string, state interface{}) {
	fmt.Printf("Terminate: %#v \n", reason)
}

func main() {
	node := ergo.CreateNode("node@localhost", "cookies", ergo.NodeOptions{})

	gs1 := &ExampleGenServer{}
	process, _ := node.Spawn("gs1", ergo.ProcessOptions{}, gs1, 100)
	process.Cast(process.Self(), "hey")

	process.Wait()
	fmt.Println("exited")
}

here is output of this code

$ go run ./examples/simple/GenServer.go
Init: args [100]
HandleCast: "hey" (state value 100)
HandleCast: "hi" (state value 101)
HandleCast: "hi" (state value 102)
HandleCast: "hi" (state value 103)
HandleInfo: "hello" (state value 104)
HandleInfo: "hello" (state value 105)
HandleInfo: "hello" (state value 106)
Terminate: "normal"
exited

See examples/ for more details

Elixir Phoenix Users

Users of the Elixir Phoenix framework might encounter timeouts when trying to connect a Phoenix node to an ergo node. The reason is that, in addition to global_name_server and net_kernel, Phoenix attempts to broadcast messages to the pg2 PubSub handler

To work with Phoenix nodes, you must create and register a dedicated pg2 GenServer, and spawn it inside your node. Take inspiration from the global_name_server.go for the rest of the GenServer methods, but the Spawn must have "pg2" as a process name:

type Pg2GenServer struct {
    ergo.GenServer
}

func main() {
    // ...
    pg2 := &Pg2GenServer{}
    node1 := ergo.CreateNode("node1@localhost", "cookies", ergo.NodeOptions{})
    process, _ := node1.Spawn("pg2", ergo.ProcessOptions{}, pg2, nil)
    // ...
}

Development and debugging

There is a couple of options are already defined that you might want to use

  • -trace.node
  • -trace.dist

To enable Golang profiler just add --tags debug in your go run or go build like this:

go run --tags debug ./examples/genserver/demoGenServer.go

Now golang' profiler is available at http://localhost:9009/debug/pprof

Companies are using Ergo Framework

Kaspersky RingCentral LilithGames

is your company using Ergo? add your company logo/name here

Commercial support

please, visit https://ergo.services for more information

Documentation

Index

Constants

View Source
const (

	// ApplicationStartPermanent If a permanent application terminates,
	// all other applications and the runtime system (node) are also terminated.
	ApplicationStartPermanent = "permanent"

	// ApplicationStartTemporary If a temporary application terminates,
	// this is reported but no other applications are terminated.
	ApplicationStartTemporary = "temporary"

	// ApplicationStartTransient If a transient application terminates
	// with reason normal, this is reported but no other applications are
	// terminated. If a transient application terminates abnormally, that
	// is with any other reason than normal, all other applications and
	// the runtime system (node) are also terminated.
	ApplicationStartTransient = "transient"
)
View Source
const (

	// SupervisorRestartIntensity
	SupervisorRestartIntensity = uint16(10)

	// SupervisorRestartPeriod
	SupervisorRestartPeriod = uint16(10)

	// SupervisorStrategyOneForOne If one child process terminates and is to be restarted, only
	// that child process is affected. This is the default restart strategy.
	SupervisorStrategyOneForOne = SupervisorStrategyType("one_for_one")

	// SupervisorStrategyOneForAll If one child process terminates and is to be restarted, all other
	// child processes are terminated and then all child processes are restarted.
	SupervisorStrategyOneForAll = SupervisorStrategyType("one_for_all")

	// SupervisorStrategyRestForOne If one child process terminates and is to be restarted,
	// the 'rest' of the child processes (that is, the child
	// processes after the terminated child process in the start order)
	// are terminated. Then the terminated child process and all
	// child processes after it are restarted
	SupervisorStrategyRestForOne = SupervisorStrategyType("rest_for_one")

	// SupervisorStrategySimpleOneForOne A simplified one_for_one supervisor, where all
	// child processes are dynamically added instances
	// of the same process type, that is, running the same code.
	SupervisorStrategySimpleOneForOne = SupervisorStrategyType("simple_one_for_one")

	// SupervisorChildRestartPermanent child process is always restarted
	SupervisorChildRestartPermanent = SupervisorChildRestart("permanent")

	// SupervisorChildRestartTemporary child process is never restarted
	// (not even when the supervisor restart strategy is rest_for_one
	// or one_for_all and a sibling death causes the temporary process
	// to be terminated)
	SupervisorChildRestartTemporary = SupervisorChildRestart("temporary")

	// SupervisorChildRestartTransient child process is restarted only if
	// it terminates abnormally, that is, with an exit reason other
	// than normal, shutdown, or {shutdown,Term}.
	SupervisorChildRestartTransient = SupervisorChildRestart("transient")

	// SupervisorChildShutdownBrutal means that the child process is
	// unconditionally terminated using process' Kill method
	SupervisorChildShutdownBrutal = -1

	// SupervisorChildShutdownInfinity means that the supervisor will
	// wait for an exit signal as long as child takes
	SupervisorChildShutdownInfinity = 0 // default shutdown behaviour

	// SupervisorChildShutdownTimeout5sec predefined timeout value
	SupervisorChildShutdownTimeout5sec = 5
)
View Source
const (
	DefaultCallTimeout = 5
)
View Source
const (
	DefaultProcessMailboxSize = 100
)

Variables

View Source
var (
	ErrAppAlreadyLoaded   = fmt.Errorf("Application is already loaded")
	ErrAppAlreadyStarted  = fmt.Errorf("Application is already started")
	ErrAppUnknown         = fmt.Errorf("Unknown application name")
	ErrAppIsNotRunning    = fmt.Errorf("Application is not running")
	ErrProcessBusy        = fmt.Errorf("Process is busy")
	ErrNameIsTaken        = fmt.Errorf("Name is taken")
	ErrUnsupportedRequest = fmt.Errorf("Unsupported request")
	ErrTimeout            = fmt.Errorf("Timed out")
	ErrFragmented         = fmt.Errorf("Fragmented data")
	ErrStop               = fmt.Errorf("stop")
)
View Source
var (
	ErrNotAProducer = fmt.Errorf("not a producer")
)

Functions

This section is empty.

Types

type Application

type Application struct{}

Application is implementation of ProcessBehaviour interface

func (*Application) Loop added in v1.2.0

func (a *Application) Loop(p *Process, args ...interface{}) string

type ApplicationBehaviour added in v1.2.0

type ApplicationBehaviour interface {
	Load(args ...interface{}) (ApplicationSpec, error)
	Start(process *Process, args ...interface{})
}

ApplicationBehaviour interface

type ApplicationChildSpec

type ApplicationChildSpec struct {
	Child interface{}
	Name  string
	Args  []interface{}
	// contains filtered or unexported fields
}

type ApplicationInfo

type ApplicationInfo struct {
	Name        string
	Description string
	Version     string
	PID         etf.Pid
}

type ApplicationSpec

type ApplicationSpec struct {
	Name         string
	Description  string
	Version      string
	Lifespan     time.Duration
	Applications []string
	Environment  map[string]interface{}
	// Depends		[]
	Children []ApplicationChildSpec
	// contains filtered or unexported fields
}

type ApplicationStartType

type ApplicationStartType = string

type GenServer

type GenServer struct{}

GenServer is implementation of ProcessBehaviour interface for GenServer objects

func (*GenServer) Loop added in v1.2.0

func (gs *GenServer) Loop(p *Process, args ...interface{}) string

type GenServerBehaviour added in v1.2.0

type GenServerBehaviour interface {
	// Init(...) -> state
	Init(process *Process, args ...interface{}) (state interface{})

	// HandleCast -> ("noreply", state) - noreply
	//		         ("stop", reason) - stop with reason
	HandleCast(message etf.Term, state interface{}) (string, interface{})

	// HandleCall -> ("reply", message, state) - reply
	//				 ("noreply", _, state) - noreply
	//		         ("stop", reason, _) - normal stop
	HandleCall(from etf.Tuple, message etf.Term, state interface{}) (string, etf.Term, interface{})

	// HandleInfo -> ("noreply", state) - noreply
	//		         ("stop", reason) - normal stop
	HandleInfo(message etf.Term, state interface{}) (string, interface{})

	Terminate(reason string, state interface{})
}

GenServerBehaviour interface

type GenStage added in v1.2.0

type GenStage struct {
	GenServer
}

func (*GenStage) Ask added in v1.2.0

func (gst *GenStage) Ask(p *Process, subscription GenStageSubscription, count uint) error

Ask makes a demand request for the given subscription. This function must only be used in the cases when a consumer sets a subscription to manual mode using DisableAutoDemand

func (*GenStage) Cancel added in v1.2.0

func (gst *GenStage) Cancel(p *Process, subscription GenStageSubscription, reason string) error

Cancel

func (*GenStage) DisableAutoDemand added in v1.2.0

func (gst *GenStage) DisableAutoDemand(p *Process, subscription GenStageSubscription) error

DisableAutoDemand means that demand must be sent to producers explicitly using Ask method. This mode can be used when a special behaviour is desired.

func (*GenStage) DisableForwardDemand added in v1.2.0

func (gst *GenStage) DisableForwardDemand(p *Process) error

DisableForwardDemand disables forwarding messages to the HandleDemand on a producer stage. This is useful as a synchronization mechanism, where the demand is accumulated until all consumers are subscribed.

func (*GenStage) EnableAutoDemand added in v1.2.0

func (gst *GenStage) EnableAutoDemand(p *Process, subscription GenStageSubscription) error

EnableAutoDemand enables auto demand mode (this is default mode for the consumer).

func (*GenStage) EnableForwardDemand added in v1.2.0

func (gst *GenStage) EnableForwardDemand(p *Process) error

EnableForwardDemand enables forwarding messages to the HandleDemand on a producer stage. This is default mode for the producer.

func (*GenStage) HandleCall added in v1.2.0

func (gst *GenStage) HandleCall(from etf.Tuple, message etf.Term, state interface{}) (string, etf.Term, interface{})

func (*GenStage) HandleCancel added in v1.2.0

func (gst *GenStage) HandleCancel(subscription GenStageSubscription, reason string, state interface{}) error

func (*GenStage) HandleCanceled added in v1.2.0

func (gst *GenStage) HandleCanceled(subscription GenStageSubscription, reason string, state interface{}) error

func (*GenStage) HandleCast added in v1.2.0

func (gst *GenStage) HandleCast(message etf.Term, state interface{}) (string, interface{})

func (*GenStage) HandleDemand added in v1.2.0

func (gst *GenStage) HandleDemand(subscription GenStageSubscription, count uint, state interface{}) (error, etf.List)

func (*GenStage) HandleEvents added in v1.2.0

func (gst *GenStage) HandleEvents(subscription GenStageSubscription, events etf.List, state interface{}) error

func (*GenStage) HandleGenStageCall added in v1.2.0

func (gst *GenStage) HandleGenStageCall(from etf.Tuple, message etf.Term, state interface{}) (string, etf.Term)

func (*GenStage) HandleGenStageCast added in v1.2.0

func (gst *GenStage) HandleGenStageCast(message etf.Term, state interface{}) string

func (*GenStage) HandleGenStageInfo added in v1.2.0

func (gst *GenStage) HandleGenStageInfo(message etf.Term, state interface{}) string

func (*GenStage) HandleInfo added in v1.2.0

func (gst *GenStage) HandleInfo(message etf.Term, state interface{}) (string, interface{})

func (*GenStage) HandleSubscribe added in v1.2.0

func (gst *GenStage) HandleSubscribe(subscription GenStageSubscription, options GenStageSubscribeOptions,
	state interface{}) error

func (*GenStage) HandleSubscribed added in v1.2.0

func (gst *GenStage) HandleSubscribed(subscription GenStageSubscription, opts GenStageSubscribeOptions, state interface{}) (error, bool)

func (*GenStage) Init added in v1.2.0

func (gst *GenStage) Init(p *Process, args ...interface{}) interface{}

GenServer callbacks

func (*GenStage) InitStage added in v1.2.0

func (gst *GenStage) InitStage(process *Process, args ...interface{}) (GenStageOptions, interface{})

func (*GenStage) SendEvents added in v1.2.0

func (gst *GenStage) SendEvents(p *Process, events etf.List) error

SendEvents sends events for the subscribers

func (*GenStage) SetCancelMode added in v1.2.0

func (gst *GenStage) SetCancelMode(p *Process, subscription GenStageSubscription, cancel GenStageCancelMode) error

SetCancelMode defines how consumer will handle termination of the producer. There are 3 modes: GenStageCancelPermanent (default) - consumer exits when the producer cancels or exits GenStageCancelTransient - consumer exits only if reason is not normal, shutdown, or {shutdown, reason} GenStageCancelTemporary - never exits

func (*GenStage) Subscribe added in v1.2.0

func (gst *GenStage) Subscribe(p *Process, producer etf.Term, opts GenStageSubscribeOptions) GenStageSubscription

Subscribe subscribes to the given producer. HandleSubscribed callback will be invoked on a consumer stage once a request for the subscription is sent. If something went wrong on a producer side the callback HandleCancel will be invoked with a reason of cancelation.

func (*GenStage) Terminate added in v1.2.0

func (gst *GenStage) Terminate(reason string, state interface{})

type GenStageBehaviour added in v1.2.0

type GenStageBehaviour interface {

	// InitStage
	InitStage(process *Process, args ...interface{}) (GenStageOptions, interface{})

	// HandleDemand this callback is invoked on a producer stage
	// The producer that implements this callback must either store the demand, or return the amount of requested events.
	// Use `ergo.ErrStop` as an error for the normal shutdown this process. Any other error values
	// will be used as a reason for the abnornal shutdown process.
	HandleDemand(subscription GenStageSubscription, count uint, state interface{}) (error, etf.List)

	// HandleEvents this callback is invoked on a consumer stage.
	// Use `ergo.ErrStop` as an error for the normal shutdown this process. Any other error values
	// will be used as a reason for the abnornal shutdown process.
	HandleEvents(subscription GenStageSubscription, events etf.List, state interface{}) error

	// HandleSubscribe This callback is invoked on a producer stage.
	// Use `ergo.ErrStop` as an error for the normal shutdown this process. Any other error values
	// will be used as a reason for the abnornal shutdown process.
	HandleSubscribe(subscription GenStageSubscription, options GenStageSubscribeOptions, state interface{}) error

	// HandleSubscribed this callback is invoked as a confirmation for the subscription request
	// Returning false means that demand must be sent to producers explicitly using Ask method.
	// Returning true means the stage implementation will take care of automatically sending.
	// Use `ergo.ErrStop` as an error for the normal shutdown this process. Any other error values
	HandleSubscribed(subscription GenStageSubscription, opts GenStageSubscribeOptions, state interface{}) (error, bool)

	// HandleCancel
	// Invoked when a consumer is no longer subscribed to a producer (invoked on a producer stage)
	// The cancelReason will be a {Cancel: "cancel", Reason: _} if the reason for cancellation
	// was a GenStage.Cancel call. Any other value means the cancellation reason was
	// due to an EXIT.
	// Use `ergo.ErrStop` as an error for the normal shutdown this process. Any other error values
	// will be used as a reason for the abnornal shutdown process.
	HandleCancel(subscription GenStageSubscription, reason string, state interface{}) error

	// HandleCanceled
	// Invoked when a consumer is no longer subscribed to a producer (invoked on a consumer stage)
	// Termination this stage depends on a cancel mode for the given subscription. For the cancel mode
	// GenStageCancelPermanent - this stage will be terminated right after this callback invoking.
	// For the cancel mode GenStageCancelTransient - it depends on a reason of subscription canceling.
	// Cancel mode GenStageCancelTemporary keeps this stage alive whether the reason could be.
	HandleCanceled(subscription GenStageSubscription, reason string, state interface{}) error

	// HandleGenStageCall this callback is invoked on Process.Call. This method is optional
	// for the implementation
	HandleGenStageCall(from etf.Tuple, message etf.Term, state interface{}) (string, etf.Term)
	// HandleGenStageCast this callback is invoked on Process.Cast. This method is optional
	// for the implementation
	HandleGenStageCast(message etf.Term, state interface{}) string
	// HandleGenStageInfo this callback is invoked on Process.Send. This method is optional
	// for the implementation
	HandleGenStageInfo(message etf.Term, state interface{}) string
}

GenStageBehaviour interface for the GenStage inmplementation

type GenStageCancelMode added in v1.2.0

type GenStageCancelMode uint
const (
	GenStageCancelPermanent GenStageCancelMode = 0
	GenStageCancelTransient GenStageCancelMode = 1
	GenStageCancelTemporary GenStageCancelMode = 2
)

type GenStageCancelReason added in v1.2.0

type GenStageCancelReason struct {
	Cancel string
	Reason string
}

type GenStageDispatchItem added in v1.2.0

type GenStageDispatchItem struct {
	// contains filtered or unexported fields
}

type GenStageDispatcher added in v1.2.0

type GenStageDispatcher int

type GenStageDispatcherBehaviour added in v1.2.0

type GenStageDispatcherBehaviour interface {
	// InitStageDispatcher(opts)
	Init(opts GenStageOptions) interface{}

	// Ask called every time a consumer sends demand
	Ask(subscription GenStageSubscription, count uint, state interface{})

	// Cancel called every time a subscription is cancelled or the consumer goes down.
	Cancel(subscription GenStageSubscription, state interface{})

	// Dispatch called every time a producer wants to dispatch an event.
	Dispatch(events etf.List, state interface{}) []GenStageDispatchItem

	// Subscribe called every time the producer gets a new subscriber
	Subscribe(subscription GenStageSubscription, opts GenStageSubscribeOptions, state interface{}) error
}

GenStageDispatcherBehaviour defined interface for the dispatcher implementation. To create a custom dispatcher you should implement this interface and use it in GenStageOptions as a Dispatcher

func CreateGenStageDispatcherBroadcast added in v1.2.0

func CreateGenStageDispatcherBroadcast() GenStageDispatcherBehaviour

CreateGenStageDispatcherBroadcast creates a dispatcher that accumulates demand from all consumers before broadcasting events to all of them. This dispatcher guarantees that events are dispatched to all consumers without exceeding the demand of any given consumer. The demand is only sent upstream once all consumers ask for data.

func CreateGenStageDispatcherDemand added in v1.2.0

func CreateGenStageDispatcherDemand() GenStageDispatcherBehaviour

CreateGenStageDispatcherDemand creates a dispatcher that sends batches to the highest demand. This is the default dispatcher used by GenStage. In order to avoid greedy consumers, it is recommended that all consumers have exactly the same maximum demand.

func CreateGenStageDispatcherPartition added in v1.2.0

func CreateGenStageDispatcherPartition(n uint, hash func(etf.Term) int) GenStageDispatcherBehaviour

CreateGenStageDispatcherPartition creates a dispatcher that sends events according to partitions. Number of partitions 'n' must be > 0. 'hash' should return number within range [0,n). Value outside of this range is discarding event. If 'hash' is nil the random partition will be used on every event.

type GenStageOptions added in v1.2.0

type GenStageOptions struct {

	// If this stage acts as a consumer you can to define producers
	// this stage should subscribe to.
	// SubscribeTo is a list of GenStageSubscribeTo. Each element represents
	// a producer (etf.Pid or registered name) and subscription options.
	SubscribeTo []GenStageSubscribeTo

	// DisableForwarding. the demand is always forwarded to the HandleDemand callback.
	// When this options is set to 'true', demands are accumulated until mode is
	// set back to 'false' using DisableDemandAccumulating method
	DisableForwarding bool

	// BufferSize the size of the buffer to store events without demand.
	// default value = defaultDispatcherBufferSize
	BufferSize uint

	// BufferKeepLast defines whether the first or last entries should be
	// kept on the buffer in case the buffer size is exceeded.
	BufferKeepLast bool

	Dispatcher GenStageDispatcherBehaviour
}

GenStageOptions defines the GenStage' configuration using Init callback. Some options are specific to the chosen stage mode while others are shared across all types.

type GenStageSubscribeOptions added in v1.2.0

type GenStageSubscribeOptions struct {
	MinDemand uint `etf:"min_demand"`
	MaxDemand uint `etf:"max_demand"`
	// The stage implementation will take care of automatically sending
	// demand to producer (as a default behaviour). You can disable it
	// setting ManualDemand to true
	ManualDemand bool `etf:"manual"`
	// What should happend with consumer if producer has terminated
	// GenStageCancelPermanent the consumer exits when the producer cancels or exits.
	// GenStageCancelTransient the consumer exits only if reason is not "normal",
	// "shutdown", or {"shutdown", _}
	// GenStageCancelTemporary the consumer never exits
	Cancel GenStageCancelMode `etf:"cancel"`

	// Partition is defined the number of partition this subscription should belongs to.
	// This option uses in the DispatcherPartition
	Partition uint `etf:"partition"`

	// Extra is intended to be a custom set of options for the custom implementation
	// of GenStageDispatcherBehaviour
	Extra etf.Term `etf:"extra"`
}

type GenStageSubscribeTo added in v1.2.0

type GenStageSubscribeTo struct {
	Producer etf.Term
	Options  GenStageSubscribeOptions
}

type GenStageSubscription added in v1.2.0

type GenStageSubscription struct {
	Pid etf.Pid
	Ref etf.Ref
}

type Node

type Node struct {
	Cookie string

	Stop context.CancelFunc

	StartedAt time.Time

	FullName string
	// contains filtered or unexported fields
}

Node instance of created node using CreateNode

func CreateNode

func CreateNode(name string, cookie string, opts NodeOptions) *Node

CreateNode create new node with name and cookie string

func CreateNodeWithContext

func CreateNodeWithContext(ctx context.Context, name string, cookie string, opts NodeOptions) *Node

CreateNodeWithContext create new node with specified context, name and cookie string

func (*Node) AddStaticRoute added in v1.2.0

func (n *Node) AddStaticRoute(name string, port uint16) error

AddStaticRoute adds static route record into the EPMD client

func (*Node) ApplicationLoad

func (n *Node) ApplicationLoad(app interface{}, args ...interface{}) error

ApplicationLoad loads the application specification for an application into the node. It also loads the application specifications for any included applications

func (*Node) ApplicationStart

func (n *Node) ApplicationStart(appName string, args ...interface{}) (*Process, error)

ApplicationStart start Application with start type ApplicationStartTemporary If an application terminates, this is reported but no other applications are terminated

func (*Node) ApplicationStartPermanent

func (n *Node) ApplicationStartPermanent(appName string, args ...interface{}) (*Process, error)

ApplicationStartPermanent start Application with start type ApplicationStartPermanent If this application terminates, all other applications and the entire node are also terminated

func (*Node) ApplicationStartTransient

func (n *Node) ApplicationStartTransient(appName string, args ...interface{}) (*Process, error)

ApplicationStartTransient start Application with start type ApplicationStartTransient If transient application terminates with reason 'normal', this is reported and no other applications are terminated. Otherwise, all other applications and node are terminated

func (*Node) ApplicationStop

func (n *Node) ApplicationStop(name string) error

ApplicationStop stop running application

func (*Node) ApplicationUnload

func (n *Node) ApplicationUnload(appName string) error

ApplicationUnload unloads the application specification for Application from the node. It also unloads the application specifications for any included applications.

func (*Node) GetApplicationInfo

func (n *Node) GetApplicationInfo(name string) (ApplicationInfo, error)

GetApplicationInfo returns information about application

func (*Node) GetPeerList

func (n *Node) GetPeerList() []string

GetPeerList returns list of connected nodes

func (*Node) GetProcessByName

func (n *Node) GetProcessByName(name string) *Process

GetProcessByName returns Process associated with given name

func (*Node) GetProcessByPid

func (n *Node) GetProcessByPid(pid etf.Pid) *Process

GetProcessByPid returns Process by given pid

func (*Node) GetProcessList

func (n *Node) GetProcessList() []*Process

GetProcessList returns array of running process

func (*Node) IsAlive

func (n *Node) IsAlive() bool

IsAlive returns true if node is running

func (*Node) IsProcessAlive

func (n *Node) IsProcessAlive(pid etf.Pid) bool

IsProcessAlive returns true if the process with given pid is alive

func (*Node) LoadedApplications

func (n *Node) LoadedApplications() []ApplicationInfo

LoadedApplications returns a list with information about the applications, which are loaded using ApplicatoinLoad

func (*Node) MakeRef

func (n *Node) MakeRef() (ref etf.Ref)

MakeRef returns atomic reference etf.Ref within this node

func (*Node) ProcessInfo

func (n *Node) ProcessInfo(pid etf.Pid) (ProcessInfo, error)

ProcessInfo returns the details about given Pid

func (*Node) ProvideRPC

func (n *Node) ProvideRPC(module string, function string, fun rpcFunction) error

ProvideRPC register given module/function as RPC method

func (*Node) Register

func (n *Node) Register(name string, pid etf.Pid) error

Register register associates the name with pid

func (*Node) RemoveStaticRoute added in v1.2.0

func (n *Node) RemoveStaticRoute(name string)

RemoveStaticRoute removes static route record from the EPMD client

func (*Node) ResolvePort added in v1.2.0

func (n *Node) ResolvePort(name string) int

ResolvePort resolves port number for the given name. Returns -1 if not found

func (*Node) RevokeRPC

func (n *Node) RevokeRPC(module, function string) error

RevokeRPC unregister given module/function

func (*Node) Spawn

func (n *Node) Spawn(name string, opts ProcessOptions, object interface{}, args ...interface{}) (*Process, error)

Spawn create new process

func (*Node) Unregister

func (n *Node) Unregister(name string)

func (*Node) VersionERTS

func (n *Node) VersionERTS() string

func (*Node) VersionOTP

func (n *Node) VersionOTP() int

func (*Node) Wait

func (n *Node) Wait()

Wait waits until node stopped

func (*Node) WaitWithTimeout

func (n *Node) WaitWithTimeout(d time.Duration) error

WaitWithTimeout waits until node stopped. Return ErrTimeout if given timeout is exceeded

func (*Node) WhichApplications

func (n *Node) WhichApplications() []ApplicationInfo

WhichApplications returns a list with information about the applications that are currently running.

type NodeOptions

type NodeOptions struct {
	ListenRangeBegin       uint16
	ListenRangeEnd         uint16
	Hidden                 bool
	EPMDPort               uint16
	DisableEPMDServer      bool
	SendQueueLength        int
	RecvQueueLength        int
	FragmentationUnit      int
	DisableHeaderAtomCache bool
	TLSmode                TLSmodeType
	TLScrtServer           string
	TLSkeyServer           string
	TLScrtClient           string
	TLSkeyClient           string
}

NodeOptions struct with bootstrapping options for CreateNode

type Process

type Process struct {
	sync.RWMutex

	Context context.Context
	Kill    context.CancelFunc
	Exit    ProcessExitFunc

	Node *Node
	// contains filtered or unexported fields
}

func (*Process) Call

func (p *Process) Call(to interface{}, message etf.Term) (etf.Term, error)

Call makes outgoing sync request in fashion of 'gen_call'. 'to' can be Pid, registered local name or a tuple {RegisteredName, NodeName}

func (*Process) CallRPC

func (p *Process) CallRPC(node, module, function string, args ...etf.Term) (etf.Term, error)

CallRPC evaluate rpc call with given node/MFA

func (*Process) CallRPCWithTimeout

func (p *Process) CallRPCWithTimeout(timeout int, node, module, function string, args ...etf.Term) (etf.Term, error)

CallRPCWithTimeout evaluate rpc call with given node/MFA and timeout

func (*Process) CallWithTimeout

func (p *Process) CallWithTimeout(to interface{}, message etf.Term, timeout int) (etf.Term, error)

CallWithTimeout makes outgoing sync request in fashiod of 'gen_call' with given timeout

func (*Process) Cast

func (p *Process) Cast(to interface{}, message etf.Term)

Cast sends a message in fashion of 'gen_cast'. 'to' can be a Pid, registered local name or a tuple {RegisteredName, NodeName}

func (*Process) CastAfter

func (p *Process) CastAfter(to interface{}, message etf.Term, after time.Duration) context.CancelFunc

CastAfter simple wrapper for SendAfter to send '$gen_cast' message

func (*Process) CastRPC

func (p *Process) CastRPC(node, module, function string, args ...etf.Term)

CastRPC evaluate rpc cast with given node/MFA

func (*Process) DemonitorNode

func (p *Process) DemonitorNode(ref etf.Ref)

DemonitorNode removes monitor

func (*Process) DemonitorProcess

func (p *Process) DemonitorProcess(ref etf.Ref) bool

DemonitorProcess removes monitor. Returns false if the given reference 'ref' wasn't found

func (*Process) GetChildren

func (p *Process) GetChildren() []etf.Pid

GetChildren returns list of children pid (Application, Supervisor)

func (*Process) GetEnv

func (p *Process) GetEnv(name string) interface{}

GetEnv returns value associated with given environment name.

func (*Process) GetState

func (p *Process) GetState() string

GetState returns string representation of the process state (GenServer)

func (*Process) GetTrapExit added in v1.2.0

func (p *Process) GetTrapExit() bool

GetTrapExit returns whether the trap was enabled on this process

func (*Process) Info

func (p *Process) Info() ProcessInfo

Info returns detailed information about the process

func (*Process) IsAlive

func (p *Process) IsAlive() bool

IsAlive returns whether the process is alive

func (p *Process) Link(with etf.Pid)

Link creates a link between the calling process and another process

func (*Process) ListEnv

func (p *Process) ListEnv() map[string]interface{}

ListEnv returns map of configured environment variables. Process' environment is also inherited from environment variables of groupLeader (if its started as a child of Application/Supervisor)

func (*Process) MonitorNode

func (p *Process) MonitorNode(name string) etf.Ref

MonitorNode creates monitor between the current process and node. If Node fails or does not exist, the message {nodedown, Node} is delivered to the process.

func (*Process) MonitorProcess

func (p *Process) MonitorProcess(process interface{}) etf.Ref

MonitorProcess creates monitor between the processes. 'process' value can be: etf.Pid, registered local name etf.Atom or remote registered name etf.Tuple{Name etf.Atom, Node etf.Atom} When a process monitor is triggered, a 'DOWN' message sends that has the following pattern: {'DOWN', MonitorRef, Type, Object, Info} where Info has following values:

  • the exit reason of the process
  • 'noproc' (process did not exist at the time of monitor creation)
  • 'noconnection' (no connection to the node where the monitored process resides)

Note: The monitor request is an asynchronous signal. That is, it takes time before the signal reaches its destination.

func (*Process) Name

func (p *Process) Name() string

Name returns registered name of the process

func (*Process) Self

func (p *Process) Self() etf.Pid

Self returns self Pid

func (*Process) Send

func (p *Process) Send(to interface{}, message etf.Term)

Send sends a message. 'to' can be a Pid, registered local name or a tuple {RegisteredName, NodeName}

func (*Process) SendAfter

func (p *Process) SendAfter(to interface{}, message etf.Term, after time.Duration) context.CancelFunc

SendAfter starts a timer. When the timer expires, the message sends to the process identified by 'to'. 'to' can be a Pid, registered local name or a tuple {RegisteredName, NodeName}. Returns cancel function in order to discard sending a message

func (*Process) SetEnv

func (p *Process) SetEnv(name string, value interface{})

SetEnv set environment variable with given name

func (*Process) SetTrapExit added in v1.2.0

func (p *Process) SetTrapExit(trap bool)

SetTrapExit enables/disables the trap on terminate process

func (p *Process) Unlink(with etf.Pid)

Unlink removes the link, if there is one, between the calling process and the process referred to by Pid.

func (*Process) Wait

func (p *Process) Wait()

Wait waits until process stopped

func (*Process) WaitWithTimeout

func (p *Process) WaitWithTimeout(d time.Duration) error

WaitWithTimeout waits until process stopped. Return ErrTimeout if given timeout is exceeded

type ProcessBehaviour

type ProcessBehaviour interface {
	Loop(*Process, ...interface{}) string // method which implements control flow of process
}

ProcessBehaviour interface contains methods you should implement to make own process behaviour

type ProcessExitFunc

type ProcessExitFunc func(from etf.Pid, reason string)

ProcessExitFunc initiate a graceful stopping process

type ProcessInfo

type ProcessInfo struct {
	PID             etf.Pid
	Name            string
	CurrentFunction string
	Status          string
	MessageQueueLen int
	Links           []etf.Pid
	Monitors        []etf.Pid
	MonitoredBy     []etf.Pid
	Dictionary      etf.Map
	TrapExit        bool
	GroupLeader     etf.Pid
	Reductions      uint64
}

ProcessInfo struct with process details

type ProcessOptions

type ProcessOptions struct {
	MailboxSize uint16
	GroupLeader *Process
	// contains filtered or unexported fields
}

type ProcessType

type ProcessType = string

type Supervisor

type Supervisor struct {
	// contains filtered or unexported fields
}

Supervisor is implementation of ProcessBehaviour interface

func (*Supervisor) Loop added in v1.2.0

func (sv *Supervisor) Loop(svp *Process, args ...interface{}) string

func (*Supervisor) StartChild

func (sv *Supervisor) StartChild(parent *Process, specName string, args ...interface{}) (etf.Pid, error)

StartChild dynamically starts a child process with given name of child spec which is defined by Init call. Created process will use the same object (GenServer/Supervisor) you have defined in spec as a Child since it keeps pointer. You might use this object as a shared among the process you will create using this spec.

func (*Supervisor) StartChildWithSpec

func (sv *Supervisor) StartChildWithSpec(parent *Process, spec SupervisorChildSpec, args ...interface{}) (etf.Pid, error)

StartChildWithSpec dynamically starts a child process with given child spec

type SupervisorBehaviour added in v1.2.0

type SupervisorBehaviour interface {
	Init(args ...interface{}) SupervisorSpec
}

SupervisorBehaviour interface

type SupervisorChild

type SupervisorChild = string

type SupervisorChildRestart

type SupervisorChildRestart = string

type SupervisorChildShutdown

type SupervisorChildShutdown int

SupervisorChildShutdown is an integer time-out value means that the supervisor tells the child process to terminate by calling Stop method and then wait for an exit signal with reason shutdown back from the child process. If no exit signal is received within the specified number of seconds, the child process is unconditionally terminated using Kill method. There are predefined values:

SupervisorChildShutdownBrutal (-1)
SupervisorChildShutdownInfinity (0) - default value
SupervisorChildShutdownTimeout5sec (5)

type SupervisorChildSpec

type SupervisorChildSpec struct {
	Name     string
	Child    interface{}
	Args     []interface{}
	Restart  SupervisorChildRestart
	Shutdown SupervisorChildShutdown
	// contains filtered or unexported fields
}

type SupervisorSpec

type SupervisorSpec struct {
	Name     string
	Children []SupervisorChildSpec
	Strategy SupervisorStrategy
	// contains filtered or unexported fields
}

type SupervisorStrategy

type SupervisorStrategy struct {
	Type      SupervisorStrategyType
	Intensity uint16
	Period    uint16
}

type SupervisorStrategyType

type SupervisorStrategyType = string

type TLSmodeType added in v1.2.0

type TLSmodeType string

TLSmodeType should be one of TLSmodeDisabled (default), TLSmodeAuto or TLSmodeStrict

const (

	// TLSmodeDisabled no TLS encryption
	TLSmodeDisabled TLSmodeType = ""
	// TLSmodeAuto generate self-signed certificate
	TLSmodeAuto TLSmodeType = "auto"
	// TLSmodeStrict with validation certificate
	TLSmodeStrict TLSmodeType = "strict"
)

Directories

Path Synopsis
cmd
examples

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL