gen

package
v1.999.224 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: May 1, 2023 License: MIT Imports: 18 Imported by: 35

README

Generic behaviors

Server

Generic server behavior.

Example: gen.Server

Supervisor

Generic supervisor behavior.

A supervisor is responsible for starting, stopping, and monitoring its child processes. The basic idea of a supervisor is that it is to keep its child processes alive by restarting them when necessary.

Example: gen.Supervisor

Application

Generic application behavior.

Example: gen.Application

Pool

Generic pool of workers.

This behavior implements a basic design pattern with a pool of workers. All messages/requests received by the pool process are forwarded to the workers using the "Round Robin" algorithm. The worker process is automatically restarting on termination.

Example: gen.Pool

Web

Web API Gateway behavior.

The Web API Gateway pattern is also sometimes known as the "Backend For Frontend" (BFF) because you build it while thinking about the needs of the client app. Therefore, BFF sits between the client apps and the microservices. It acts as a reverse proxy, routing requests from clients to services.

Example: gen.Web

TCP

Socket acceptor pool for TCP protocols.

This behavior aims to provide everything you need to accept TCP connections and process packets with a small code base and low latency while being easy to use.

Example: gen.TCP

UDP

UDP acceptor pool for UDP protocols

This behavior provides the same feature set as TCP but for handling UDP packets using pool of handlers.

Example: gen.UDP

Stage

Generic stage behavior (originated from Elixir's GenStage).

This is abstraction built on top of gen.Server to provide a simple way to create a distributed Producer/Consumer architecture, while automatically managing the concept of backpressure. This implementation is fully compatible with Elixir's GenStage.

Example: gen.Stage

Saga

Generic saga behavior.

It implements Saga design pattern - a sequence of transactions that updates each service state and publishes the result (or cancels the transaction or triggers the next transaction step). gen.Saga also provides a feature of interim results (can be used as transaction progress or as a part of pipeline processing), time deadline (to limit transaction lifespan), two-phase commit (to make distributed transaction atomic).

Example: gen.Saga

Raft

Generic raft behavior.

It's improved implementation of Raft consensus algorithm. The key improvement is using quorum under the hood to manage the leader election process and make the Raft cluster more reliable. This implementation supports quorums of 3, 5, 7, 9, or 11 quorum members.

Example: gen.Raft

Documentation

Index

Constants

View Source
const (

	// ApplicationStartPermanent If a permanent application terminates,
	// all other applications and the runtime system (node) are also terminated.
	ApplicationStartPermanent ApplicationStartType = "permanent"

	// ApplicationStartTemporary If a temporary application terminates,
	// this is reported but no other applications are terminated.
	ApplicationStartTemporary ApplicationStartType = "temporary"

	// ApplicationStartTransient If a transient application terminates
	// with reason normal, this is reported but no other applications are
	// terminated. If a transient application terminates abnormally, that
	// is with any other reason than normal, all other applications and
	// the runtime system (node) are also terminated.
	ApplicationStartTransient ApplicationStartType = "transient"

	// EnvKeyAppSpec
	EnvKeyAppSpec EnvKey = "ergo:AppSpec"
)
View Source
const (
	DefaultRaftGetTimeout    = 5 // in seconds
	DefaultRaftAppendTimeout = 5 // in seconds
	DefaultRaftHeartbeat     = 3 // in seconds
)
View Source
const (

	// SupervisorRestartIntensity
	SupervisorRestartIntensity = uint16(10)

	// SupervisorRestartPeriod
	SupervisorRestartPeriod = uint16(10)

	// SupervisorStrategyOneForOne If one child process terminates and is to be restarted, only
	// that child process is affected. This is the default restart strategy.
	SupervisorStrategyOneForOne = SupervisorStrategyType("one_for_one")

	// SupervisorStrategyOneForAll If one child process terminates and is to be restarted, all other
	// child processes are terminated and then all child processes are restarted.
	SupervisorStrategyOneForAll = SupervisorStrategyType("one_for_all")

	// SupervisorStrategyRestForOne If one child process terminates and is to be restarted,
	// the 'rest' of the child processes (that is, the child
	// processes after the terminated child process in the start order)
	// are terminated. Then the terminated child process and all
	// child processes after it are restarted
	SupervisorStrategyRestForOne = SupervisorStrategyType("rest_for_one")

	// SupervisorStrategySimpleOneForOne A simplified one_for_one supervisor, where all
	// child processes are dynamically added instances
	// of the same process type, that is, running the same code.
	SupervisorStrategySimpleOneForOne = SupervisorStrategyType("simple_one_for_one")

	// SupervisorStrategyRestartPermanent child process is always restarted
	SupervisorStrategyRestartPermanent = SupervisorStrategyRestart("permanent")

	// SupervisorStrategyRestartTemporary child process is never restarted
	// (not even when the supervisor restart strategy is rest_for_one
	// or one_for_all and a sibling death causes the temporary process
	// to be terminated)
	SupervisorStrategyRestartTemporary = SupervisorStrategyRestart("temporary")

	// SupervisorStrategyRestartTransient child process is restarted only if
	// it terminates abnormally, that is, with an exit reason other
	// than normal, shutdown.
	SupervisorStrategyRestartTransient = SupervisorStrategyRestart("transient")
)
View Source
const (
	DefaultCallTimeout = 5
)

Variables

View Source
var (
	ErrRaftState        = fmt.Errorf("incorrect raft state")
	ErrRaftNoQuorum     = fmt.Errorf("no quorum")
	ErrRaftNoLeader     = fmt.Errorf("no leader")
	ErrRaftNoSerial     = fmt.Errorf("no peers with requested serial")
	ErrRaftBusy         = fmt.Errorf("another append request is in progress")
	ErrRaftWrongTimeout = fmt.Errorf("wrong timeout value")
)
View Source
var (
	RaftStatusOK      RaftStatus // nil
	RaftStatusStop    RaftStatus = fmt.Errorf("stop")
	RaftStatusDiscard RaftStatus = fmt.Errorf("discard")

	RaftQuorumState3  RaftQuorumState = 3 // minimum quorum that could make leader election
	RaftQuorumState5  RaftQuorumState = 5
	RaftQuorumState7  RaftQuorumState = 7
	RaftQuorumState9  RaftQuorumState = 9
	RaftQuorumState11 RaftQuorumState = 11 // maximal quorum

)
View Source
var (
	SagaStatusOK   SagaStatus // nil
	SagaStatusStop SagaStatus = fmt.Errorf("stop")

	ErrSagaTxEndOfLifespan   = fmt.Errorf("End of TX lifespan")
	ErrSagaTxNextTimeout     = fmt.Errorf("Next saga timeout")
	ErrSagaUnknown           = fmt.Errorf("Unknown saga")
	ErrSagaJobUnknown        = fmt.Errorf("Unknown job")
	ErrSagaTxUnknown         = fmt.Errorf("Unknown TX")
	ErrSagaTxCanceled        = fmt.Errorf("Tx is canceled")
	ErrSagaTxInProgress      = fmt.Errorf("Tx is still in progress")
	ErrSagaResultAlreadySent = fmt.Errorf("Result is already sent")
	ErrSagaNotAllowed        = fmt.Errorf("Operation is not allowed")
)
View Source
var (
	ServerStatusOK     ServerStatus = nil
	ServerStatusStop   ServerStatus = fmt.Errorf("stop")
	ServerStatusIgnore ServerStatus = fmt.Errorf("ignore")

	DirectStatusOK     DirectStatus = nil
	DirectStatusIgnore DirectStatus = fmt.Errorf("ignore")
)

Functions

This section is empty.

Types

type Application

type Application struct{}

Application is implementation of ProcessBehavior interface

func (*Application) ProcessInit

func (a *Application) ProcessInit(p Process, args ...etf.Term) (ProcessState, error)

ProcessInit

func (*Application) ProcessLoop

func (a *Application) ProcessLoop(ps ProcessState, started chan<- bool) string

ProcessLoop

type ApplicationBehavior

type ApplicationBehavior interface {
	ProcessBehavior
	Load(args ...etf.Term) (ApplicationSpec, error)
	Start(process Process, args ...etf.Term)
}

ApplicationBehavior interface

type ApplicationChildSpec

type ApplicationChildSpec struct {
	Child   ProcessBehavior
	Options ProcessOptions
	Name    string
	Args    []etf.Term
	// contains filtered or unexported fields
}

ApplicationChildSpec

type ApplicationInfo

type ApplicationInfo struct {
	Name        string
	Description string
	Version     string
	PID         etf.Pid
}

ApplicationInfo

type ApplicationSpec

type ApplicationSpec struct {
	sync.Mutex
	Name         string
	Description  string
	Version      string
	Lifespan     time.Duration
	Applications []string
	Env          map[EnvKey]interface{}
	Children     []ApplicationChildSpec
	Process      Process
	StartType    ApplicationStartType
}

ApplicationSpec

type ApplicationStartType

type ApplicationStartType = string

type CancelFunc

type CancelFunc func() bool

type Core

type Core interface {

	// ProcessByName returns Process for the given name.
	// Returns nil if it doesn't exist (not found) or terminated.
	ProcessByName(name string) Process

	// ProcessByPid returns Process for the given Pid.
	// Returns nil if it doesn't exist (not found) or terminated.
	ProcessByPid(pid etf.Pid) Process

	// ProcessByAlias returns Process for the given alias.
	// Returns nil if it doesn't exist (not found) or terminated
	ProcessByAlias(alias etf.Alias) Process

	// ProcessInfo returns the details about given Pid
	ProcessInfo(pid etf.Pid) (ProcessInfo, error)

	// ProcessList returns the list of running processes
	ProcessList() []Process

	// MakeRef creates an unique reference within this node
	MakeRef() etf.Ref

	// IsAlias checks whether the given alias is belongs to the alive process on this node.
	// If the process died all aliases are cleaned up and this function returns
	// false for the given alias. For alias from the remote node always returns false.
	IsAlias(etf.Alias) bool

	// IsMonitor returns true if the given references is a monitor
	IsMonitor(ref etf.Ref) bool

	// RegisterBehavior
	RegisterBehavior(group, name string, behavior ProcessBehavior, data interface{}) error
	// RegisteredBehavior
	RegisteredBehavior(group, name string) (RegisteredBehavior, error)
	// RegisteredBehaviorGroup
	RegisteredBehaviorGroup(group string) []RegisteredBehavior
	// UnregisterBehavior
	UnregisterBehavior(group, name string) error
}

Core the common set of methods provided by Process and node.Node interfaces

type DirectStatus

type DirectStatus error

type EnvKey

type EnvKey string

EnvKey

type Event

type Event string

type EventMessage

type EventMessage interface{}

type MessageDirectChildren

type MessageDirectChildren struct{}

MessageDirectChildren type intended to be used in Process.Children which returns []etf.Pid You can handle this type of message in your HandleDirect callback to enable Process.Children support for your gen.Server actor.

type MessageDown

type MessageDown struct {
	Ref       etf.Ref   // a monitor reference
	ProcessID ProcessID // if monitor was created by name
	Pid       etf.Pid
	Reason    string
}

MessageDown delivers as a message to Server's HandleInfo callback of the process that created monitor using MonitorProcess. Reason values:

  • the exit reason of the process
  • 'noproc' (process did not exist at the time of monitor creation)
  • 'noconnection' (no connection to the node where the monitored process resides)
  • 'noproxy' (no connection to the proxy this node had has a connection through. monitored process could be still alive)

func IsMessageDown

func IsMessageDown(message etf.Term) (MessageDown, bool)

IsMessageDown

type MessageEventDown

type MessageEventDown struct {
	Event  Event
	Reason string
}

MessageEventDown delivers to the process which monitored EventType if the owner of this EventType has terminated

type MessageExit

type MessageExit struct {
	Pid    etf.Pid
	Reason string
}

MessageExit delievers to Server's HandleInfo callback on enabled trap exit using SetTrapExit(true) Reason values:

  • the exit reason of the process
  • 'noproc' (process did not exist at the time of link creation)
  • 'noconnection' (no connection to the node where the linked process resides)
  • 'noproxy' (no connection to the proxy this node had has a connection through. linked process could be still alive)

func IsMessageExit

func IsMessageExit(message etf.Term) (MessageExit, bool)

IsMessageExit

type MessageFallback

type MessageFallback struct {
	Process etf.Pid
	Tag     string
	Message etf.Term
}

MessageFallback delivers to the process specified as a fallback process in ProcessOptions.Fallback.Name if the mailbox has been overflowed

func IsMessageFallback

func IsMessageFallback(message etf.Term) (MessageFallback, bool)

IsMessageFallback

type MessageNodeDown

type MessageNodeDown struct {
	Ref  etf.Ref
	Name string
}

MessageNodeDown delivers as a message to Server's HandleInfo callback of the process that created monitor using MonitorNode

type MessageProxyDown

type MessageProxyDown struct {
	Ref    etf.Ref
	Node   string
	Proxy  string
	Reason string
}

MessageProxyDown delivers as a message to Server's HandleInfo callback of the process that created monitor using MonitorNode if the connection to the node was through the proxy nodes and one of them went down.

func IsMessageProxyDown

func IsMessageProxyDown(message etf.Term) (MessageProxyDown, bool)

IsMessageProxyDown

type MessageSagaCancel

type MessageSagaCancel struct {
	TransactionID SagaTransactionID
	NextID        SagaNextID
	Reason        string
}

MessageSagaCancel

type MessageSagaError

type MessageSagaError struct {
	TransactionID SagaTransactionID
	NextID        SagaNextID
	Error         string
	Details       string
}

MessageSagaError

type Pool added in v1.999.222

type Pool struct {
	Server
}

func (*Pool) HandleCall added in v1.999.222

func (p *Pool) HandleCall(process *ServerProcess, from ServerFrom, message etf.Term) (etf.Term, ServerStatus)

func (*Pool) HandleCast added in v1.999.222

func (p *Pool) HandleCast(process *ServerProcess, message etf.Term) ServerStatus

func (*Pool) HandleInfo added in v1.999.222

func (p *Pool) HandleInfo(process *ServerProcess, message etf.Term) ServerStatus

func (*Pool) Init added in v1.999.222

func (p *Pool) Init(process *ServerProcess, args ...etf.Term) error

type PoolBehavior added in v1.999.222

type PoolBehavior interface {
	ServerBehavior

	InitPool(process *PoolProcess, args ...etf.Term) (PoolOptions, error)
}

type PoolOptions added in v1.999.222

type PoolOptions struct {
	NumWorkers    int
	Worker        PoolWorkerBehavior
	WorkerOptions ProcessOptions
	WorkerArgs    []etf.Term
}

type PoolProcess added in v1.999.222

type PoolProcess struct {
	ServerProcess
	// contains filtered or unexported fields
}

type PoolWorker added in v1.999.222

type PoolWorker struct {
	Server
}

func (*PoolWorker) HandleInfo added in v1.999.222

func (pw *PoolWorker) HandleInfo(process *ServerProcess, message etf.Term) ServerStatus

func (*PoolWorker) HandleWorkerCall added in v1.999.222

func (pw *PoolWorker) HandleWorkerCall(process *PoolWorkerProcess, message etf.Term) etf.Term

HandleWorkerCall

func (*PoolWorker) HandleWorkerCast added in v1.999.222

func (pw *PoolWorker) HandleWorkerCast(process *PoolWorkerProcess, message etf.Term)

HandleWorkerCast

func (*PoolWorker) HandleWorkerInfo added in v1.999.222

func (pw *PoolWorker) HandleWorkerInfo(process *PoolWorkerProcess, message etf.Term)

HandleWorkerInfo

func (*PoolWorker) Init added in v1.999.222

func (pw *PoolWorker) Init(process *ServerProcess, args ...etf.Term) error

type PoolWorkerBehavior added in v1.999.222

type PoolWorkerBehavior interface {
	ServerBehavior
	InitPoolWorker(process *PoolWorkerProcess, args ...etf.Term) error
	HandleWorkerInfo(process *PoolWorkerProcess, message etf.Term)
	HandleWorkerCast(process *PoolWorkerProcess, message etf.Term)
	HandleWorkerCall(process *PoolWorkerProcess, message etf.Term) etf.Term
}

type PoolWorkerProcess added in v1.999.222

type PoolWorkerProcess struct {
	ServerProcess
}

type Process

type Process interface {
	Core

	// Spawn create a new process with parent
	Spawn(name string, opts ProcessOptions, object ProcessBehavior, args ...etf.Term) (Process, error)

	// RemoteSpawn creates a new process at a remote node. The object name is a regitered
	// behavior on a remote name using RegisterBehavior(...). The given options will stored
	// in the process environment using node.EnvKeyRemoteSpawn as a key
	RemoteSpawn(node string, object string, opts RemoteSpawnOptions, args ...etf.Term) (etf.Pid, error)
	RemoteSpawnWithTimeout(timeout int, node string, object string, opts RemoteSpawnOptions, args ...etf.Term) (etf.Pid, error)

	// Name returns process name used on starting.
	Name() string

	// RegisterName register associates the name with pid (not overrides registered name on starting)
	RegisterName(name string) error

	// UnregisterName unregister named process. Unregistering name is allowed to the owner only
	UnregisterName(name string) error

	// NodeName returns node name
	NodeName() string

	// NodeStop stops the node
	NodeStop()

	// NodeUptime returns node lifespan
	NodeUptime() int64

	// Info returns process details
	Info() ProcessInfo

	// Self returns registered process identificator belongs to the process
	Self() etf.Pid

	// Direct make a direct request to the actor (gen.Application, gen.Supervisor, gen.Server or
	// inherited from gen.Server actor) with default timeout 5 seconds
	Direct(request interface{}) (interface{}, error)

	// DirectWithTimeout make a direct request to the actor with the given timeout (in seconds)
	DirectWithTimeout(request interface{}, timeout int) (interface{}, error)

	// Send sends a message in fashion of 'erlang:send'. The value of 'to' can be a Pid, registered local name
	// or gen.ProcessID{RegisteredName, NodeName}
	Send(to interface{}, message etf.Term) error

	// SendAfter starts a timer. When the timer expires, the message sends to the process
	// identified by 'to'.  'to' can be a Pid, registered local name or
	// gen.ProcessID{RegisteredName, NodeName}. Returns cancel function in order to discard
	// sending a message. CancelFunc returns bool value. If it returns false, than the timer has
	// already expired and the message has been sent.
	SendAfter(to interface{}, message etf.Term, after time.Duration) CancelFunc

	// Exit initiate a graceful stopping process
	Exit(reason string) error

	// Kill immediately stops process
	Kill()

	// CreateAlias creates a new alias for the Process
	CreateAlias() (etf.Alias, error)

	// DeleteAlias deletes the given alias
	DeleteAlias(alias etf.Alias) error

	// ListEnv returns a map of configured environment variables.
	// It also includes environment variables from the GroupLeader, Parent and Node.
	// which are overlapped by priority: Process(Parent(GroupLeader(Node)))
	ListEnv() map[EnvKey]interface{}

	// SetEnv set environment variable with given name. Use nil value to remove variable with given name.
	SetEnv(name EnvKey, value interface{})

	// Env returns value associated with given environment name.
	Env(name EnvKey) interface{}

	// Wait waits until process stopped
	Wait()

	// WaitWithTimeout waits until process stopped. Return ErrTimeout
	// if given timeout is exceeded
	WaitWithTimeout(d time.Duration) error

	// Link creates a link between the calling process and another process.
	// Links are bidirectional and there can only be one link between two processes.
	// Repeated calls to Process.Link(Pid) have no effect. If one of the participants
	// of a link terminates, it will send an exit signal to the other participant and caused
	// termination of the last one. If process set a trap using Process.SetTrapExit(true) the exit signal transorms into the MessageExit and delivers as a regular message.
	Link(with etf.Pid) error

	// Unlink removes the link, if there is one, between the calling process and
	// the process referred to by Pid.
	Unlink(with etf.Pid) error

	// IsAlive returns whether the process is alive
	IsAlive() bool

	// SetTrapExit enables/disables the trap on terminate process. When a process is trapping exits,
	// it will not terminate when an exit signal is received. Instead, the signal is transformed
	// into a 'gen.MessageExit' which is put into the mailbox of the process just like a regular message.
	SetTrapExit(trap bool)

	// TrapExit returns whether the trap was enabled on this process
	TrapExit() bool

	// Compression returns true if compression is enabled for this process
	Compression() bool

	// SetCompression enables/disables compression for the messages sent outside of this node
	SetCompression(enabled bool)

	// CompressionLevel returns comression level for the process
	CompressionLevel() int

	// SetCompressionLevel defines compression level. Value must be in range:
	// 1 (best speed) ... 9 (best compression), or -1 for the default compression level
	SetCompressionLevel(level int) bool

	// CompressionThreshold returns compression threshold for the process
	CompressionThreshold() int

	// SetCompressionThreshold defines the minimal size for the message that must be compressed
	// Value must be greater than DefaultCompressionThreshold (1024)
	SetCompressionThreshold(threshold int) bool

	// MonitorNode creates monitor between the current process and node. If Node fails or does not exist,
	// the message MessageNodeDown is delivered to the process.
	MonitorNode(name string) etf.Ref

	// DemonitorNode removes monitor. Returns false if the given reference wasn't found
	DemonitorNode(ref etf.Ref) bool

	// MonitorProcess creates monitor between the processes.
	// Allowed types for the 'process' value: etf.Pid, gen.ProcessID
	// When a process monitor is triggered, a MessageDown sends to the caller.
	// Note: The monitor request is an asynchronous signal. That is, it takes
	// time before the signal reaches its destination.
	MonitorProcess(process interface{}) etf.Ref

	// DemonitorProcess removes monitor. Returns false if the given reference wasn't found
	DemonitorProcess(ref etf.Ref) bool

	// Behavior returns the object this process runs on.
	Behavior() ProcessBehavior
	// GroupLeader returns group leader process. Usually it points to the application process.
	GroupLeader() Process
	// Parent returns parent process. It returns nil if this process was spawned using Node.Spawn.
	Parent() Process
	// Context returns process context.
	Context() context.Context

	// Children returns list of children pid (Application, Supervisor)
	Children() ([]etf.Pid, error)

	// Links returns list of the process pids this process has linked to.
	Links() []etf.Pid
	// Monitors returns list of monitors created this process by pid.
	Monitors() []etf.Pid
	// Monitors returns list of monitors created this process by name.
	MonitorsByName() []ProcessID
	// MonitoredBy returns list of process pids monitored this process.
	MonitoredBy() []etf.Pid
	// Aliases returns list of aliases of this process.
	Aliases() []etf.Alias

	// RegisterEvent
	RegisterEvent(event Event, messages ...EventMessage) error
	UnregisterEvent(event Event) error
	MonitorEvent(event Event) error
	DemonitorEvent(event Event) error
	SendEventMessage(event Event, message EventMessage) error

	PutSyncRequest(ref etf.Ref) error
	CancelSyncRequest(ref etf.Ref)
	WaitSyncReply(ref etf.Ref, timeout int) (etf.Term, error)
	PutSyncReply(ref etf.Ref, term etf.Term, err error) error
	ProcessChannels() ProcessChannels
}

Process

type ProcessBehavior

type ProcessBehavior interface {
	ProcessInit(Process, ...etf.Term) (ProcessState, error)
	ProcessLoop(ProcessState, chan<- bool) string // method which implements control flow of process
}

ProcessBehavior interface contains methods you should implement to make your own process behavior

type ProcessChannels

type ProcessChannels struct {
	Mailbox      <-chan ProcessMailboxMessage
	Direct       <-chan ProcessDirectMessage
	GracefulExit <-chan ProcessGracefulExitRequest
}

ProcessChannels

type ProcessDirectMessage

type ProcessDirectMessage struct {
	Ref     etf.Ref
	Message interface{}
	Err     error
}

ProcessDirectMessage

type ProcessFallback

type ProcessFallback struct {
	Name string
	Tag  string
}

ProcessFallback

type ProcessGracefulExitRequest

type ProcessGracefulExitRequest struct {
	From   etf.Pid
	Reason string
}

ProcessGracefulExitRequest

type ProcessID

type ProcessID struct {
	Name string
	Node string
}

ProcessID long notation of registered process {process_name, node_name}

func (ProcessID) String

func (p ProcessID) String() string

String string representaion of ProcessID value

type ProcessInfo

type ProcessInfo struct {
	PID             etf.Pid
	Name            string
	CurrentFunction string
	Status          string
	MessageQueueLen int
	Links           []etf.Pid
	Monitors        []etf.Pid
	MonitorsByName  []ProcessID
	MonitoredBy     []etf.Pid
	Aliases         []etf.Alias
	Dictionary      etf.Map
	TrapExit        bool
	GroupLeader     etf.Pid
	Compression     bool
}

ProcessInfo struct with process details

type ProcessMailboxMessage

type ProcessMailboxMessage struct {
	From    etf.Pid
	Message interface{}
}

ProcessMailboxMessage

type ProcessOptions

type ProcessOptions struct {
	// Context allows mixing the system context with the custom one. E.g., to limit
	// the lifespan using context.WithTimeout. This context MUST be based on the
	// other Process' context. Otherwise, you get the error lib.ErrProcessContext
	Context context.Context
	// MailboxSize defines the length of message queue for the process
	MailboxSize uint16
	// DirectboxSize defines the length of message queue for the direct requests
	DirectboxSize uint16
	// GroupLeader
	GroupLeader Process
	// Env set the process environment variables
	Env map[EnvKey]interface{}

	// Fallback defines the process to where messages will be forwarded
	// if the mailbox is overflowed. The tag value could be used to
	// differentiate the source processes. Forwarded messages are wrapped
	// into the MessageFallback struct.
	Fallback ProcessFallback
}

ProcessOptions

type ProcessState

type ProcessState struct {
	Process
	State interface{}
}

ProcessState

type Raft

type Raft struct {
	Server
}

func (*Raft) HandleCall

func (r *Raft) HandleCall(process *ServerProcess, from ServerFrom, message etf.Term) (etf.Term, ServerStatus)

HandleCall

func (*Raft) HandleCancel

func (r *Raft) HandleCancel(process *RaftProcess, ref etf.Ref, reason string) RaftStatus

HandleCancel

func (*Raft) HandleCast

func (r *Raft) HandleCast(process *ServerProcess, message etf.Term) ServerStatus

HandleCast

func (*Raft) HandleInfo

func (r *Raft) HandleInfo(process *ServerProcess, message etf.Term) ServerStatus

HandleInfo

func (*Raft) HandleLeader

func (r *Raft) HandleLeader(process *RaftProcess, leader *RaftLeader) RaftStatus

HandleLeader

func (*Raft) HandlePeer

func (r *Raft) HandlePeer(process *RaftProcess, peer etf.Pid, serial uint64) RaftStatus

HandlePeer

func (*Raft) HandleQuorum

func (r *Raft) HandleQuorum(process *RaftProcess, quorum *RaftQuorum) RaftStatus

HandleQuorum

func (*Raft) HandleRaftCall

func (r *Raft) HandleRaftCall(process *RaftProcess, from ServerFrom, message etf.Term) (etf.Term, ServerStatus)

HandleRaftCall

func (*Raft) HandleRaftCast

func (r *Raft) HandleRaftCast(process *RaftProcess, message etf.Term) ServerStatus

HandleRaftCast

func (*Raft) HandleRaftDirect

func (r *Raft) HandleRaftDirect(process *RaftProcess, message interface{}) (interface{}, error)

HandleRaftDirect

func (*Raft) HandleRaftInfo

func (r *Raft) HandleRaftInfo(process *RaftProcess, message etf.Term) ServerStatus

HandleRaftInfo

func (*Raft) HandleSerial

func (r *Raft) HandleSerial(process *RaftProcess, ref etf.Ref, serial uint64, key string, value etf.Term) RaftStatus

HandleSerial

func (*Raft) Init

func (r *Raft) Init(process *ServerProcess, args ...etf.Term) error

type RaftBehavior

type RaftBehavior interface {
	ServerBehavior

	InitRaft(process *RaftProcess, arr ...etf.Term) (RaftOptions, error)

	// HandleAppend. Invokes on append request. To cancel this request by a leader, it must return RaftStatusDiscard.
	HandleAppend(process *RaftProcess, ref etf.Ref, serial uint64, key string, value etf.Term) RaftStatus

	// HandleGet
	HandleGet(process *RaftProcess, serial uint64) (string, etf.Term, RaftStatus)

	// HandlePeer
	HandlePeer(process *RaftProcess, peer etf.Pid, serial uint64) RaftStatus

	// HandleQuorum
	HandleQuorum(process *RaftProcess, quorum *RaftQuorum) RaftStatus

	// HandleLeader
	HandleLeader(process *RaftProcess, leader *RaftLeader) RaftStatus

	// HandleCancel
	HandleCancel(process *RaftProcess, ref etf.Ref, reason string) RaftStatus

	// HandleSerial
	HandleSerial(process *RaftProcess, ref etf.Ref, serial uint64, key string, value etf.Term) RaftStatus

	// HandleRaftCall this callback is invoked on ServerProcess.Call. This method is optional
	// for the implementation
	HandleRaftCall(process *RaftProcess, from ServerFrom, message etf.Term) (etf.Term, ServerStatus)
	// HandleStageCast this callback is invoked on ServerProcess.Cast. This method is optional
	// for the implementation
	HandleRaftCast(process *RaftProcess, message etf.Term) ServerStatus
	// HandleStageInfo this callback is invoked on Process.Send. This method is optional
	// for the implementation
	HandleRaftInfo(process *RaftProcess, message etf.Term) ServerStatus
	// HandleRaftDirect this callback is invoked on Process.Direct. This method is optional
	// for the implementation
	HandleRaftDirect(process *RaftProcess, message interface{}) (interface{}, error)
}

type RaftLeader

type RaftLeader struct {
	Leader etf.Pid
	Serial uint64
	State  RaftQuorumState
}

type RaftOptions

type RaftOptions struct {
	ID     string // raft cluster id
	Peers  []ProcessID
	Serial uint64 // serial number ("log id" in terms of Raft spec)
}

type RaftProcess

type RaftProcess struct {
	ServerProcess
	// contains filtered or unexported fields
}

func (*RaftProcess) Append

func (rp *RaftProcess) Append(key string, value etf.Term) (etf.Ref, error)

Append

func (*RaftProcess) AppendWithTimeout

func (rp *RaftProcess) AppendWithTimeout(key string, value etf.Term, timeout int) (etf.Ref, error)

AppendWithTimeout

func (*RaftProcess) Get

func (rp *RaftProcess) Get(serial uint64) (etf.Ref, error)

Get makes a request to the quorum member to get the data with the given serial number and sets the timeout to the DefaultRaftGetTimeout = 5 sec. It returns ErrRaftNoQuorum if quorum forming is still in progress.

func (*RaftProcess) GetWithTimeout

func (rp *RaftProcess) GetWithTimeout(serial uint64, timeout int) (etf.Ref, error)

Get makes a request to the quorum member to get the data with the given serial number and timeout in seconds. Returns a reference of this request. Once requested data has arrived the callback HandleSerial will be invoked. If a timeout occurred the callback HandleCancel will be invoked with reason "timeout"

func (*RaftProcess) Join

func (rp *RaftProcess) Join(peer interface{}) error

Join makes a join requst to the given peer, which is supposed to be in a raft cluster

func (*RaftProcess) Leader

func (rp *RaftProcess) Leader() *RaftLeader

Leader returns current leader in the quorum. It returns nil If this process is not a quorum or if leader election is still in progress

func (*RaftProcess) Peers

func (rp *RaftProcess) Peers() []etf.Pid

Peers returns list of the processes in the raft cluster. Note, this list is sorted by the Serial value on them in the descending order

func (*RaftProcess) Quorum

func (rp *RaftProcess) Quorum() *RaftQuorum

Quorum returns current quorum. It returns nil if quorum hasn't built yet.

func (*RaftProcess) Serial

func (rp *RaftProcess) Serial() uint64

Serial returns current value of serial for this raft process

type RaftQuorum

type RaftQuorum struct {
	Member bool
	State  RaftQuorumState
	Peers  []etf.Pid // the number of participants in quorum could be 3,5,7,9,11
}

type RaftQuorumState

type RaftQuorumState int

type RaftStatus

type RaftStatus error

type RegisteredBehavior

type RegisteredBehavior struct {
	Behavior ProcessBehavior
	Data     interface{}
}

RegisteredBehavior

type RemoteSpawnOptions

type RemoteSpawnOptions struct {
	// Name register associated name with spawned process
	Name string
	// Monitor enables monitor on the spawned process using provided reference
	Monitor etf.Ref
	// Link enables link between the calling and spawned processes
	Link bool
	// Function in order to support {M,F,A} request to the Erlang node
	Function string
}

RemoteSpawnOptions defines options for RemoteSpawn method

type RemoteSpawnRequest

type RemoteSpawnRequest struct {
	From    etf.Pid
	Ref     etf.Ref
	Options RemoteSpawnOptions
}

RemoteSpawnRequest

type Saga

type Saga struct {
	Server
}

Saga

func (*Saga) HandleCall

func (gs *Saga) HandleCall(process *ServerProcess, from ServerFrom, message etf.Term) (etf.Term, ServerStatus)

HandleCall

func (*Saga) HandleCast

func (gs *Saga) HandleCast(process *ServerProcess, message etf.Term) ServerStatus

HandleCast

func (*Saga) HandleDirect

func (gs *Saga) HandleDirect(process *ServerProcess, ref etf.Ref, message interface{}) (interface{}, DirectStatus)

HandleDirect

func (*Saga) HandleInfo

func (gs *Saga) HandleInfo(process *ServerProcess, message etf.Term) ServerStatus

HandleInfo

func (*Saga) HandleJobFailed

func (gs *Saga) HandleJobFailed(process *SagaProcess, id SagaTransactionID, from SagaJobID, reason string) SagaStatus

HandleJobFailed

func (*Saga) HandleJobInterim

func (gs *Saga) HandleJobInterim(process *SagaProcess, id SagaTransactionID, from SagaJobID, interim interface{}) SagaStatus

HandleJobInterim

func (*Saga) HandleJobResult

func (gs *Saga) HandleJobResult(process *SagaProcess, id SagaTransactionID, from SagaJobID, result interface{}) SagaStatus

HandleJobResult

func (*Saga) HandleSagaCall

func (gs *Saga) HandleSagaCall(process *SagaProcess, from ServerFrom, message etf.Term) (etf.Term, ServerStatus)

HandleSagaCall

func (*Saga) HandleSagaCast

func (gs *Saga) HandleSagaCast(process *SagaProcess, message etf.Term) ServerStatus

HandleSagaCast

func (*Saga) HandleSagaDirect

func (gs *Saga) HandleSagaDirect(process *SagaProcess, ref etf.Ref, message interface{}) (interface{}, DirectStatus)

HandleSagaDirect

func (*Saga) HandleSagaInfo

func (gs *Saga) HandleSagaInfo(process *SagaProcess, message etf.Term) ServerStatus

HandleSagaInfo

func (*Saga) HandleTxCommit

func (gs *Saga) HandleTxCommit(process *SagaProcess, id SagaTransactionID, final interface{}) SagaStatus

HandleTxCommit

func (*Saga) HandleTxDone

func (gs *Saga) HandleTxDone(process *SagaProcess, id SagaTransactionID, result interface{}) (interface{}, SagaStatus)

HandleTxDone

func (*Saga) HandleTxInterim

func (gs *Saga) HandleTxInterim(process *SagaProcess, id SagaTransactionID, from SagaNextID, interim interface{}) SagaStatus

HandleTxInterim

func (*Saga) Init

func (gs *Saga) Init(process *ServerProcess, args ...etf.Term) error

Init

func (*Saga) SetMaxTransactions

func (gs *Saga) SetMaxTransactions(process Process, max uint) error

SetMaxTransactions set maximum transactions fo the saga

type SagaBehavior

type SagaBehavior interface {
	ServerBehavior

	// InitSaga
	InitSaga(process *SagaProcess, args ...etf.Term) (SagaOptions, error)

	// HandleTxNew invokes on a new TX receiving by this saga.
	HandleTxNew(process *SagaProcess, id SagaTransactionID, value interface{}) SagaStatus

	// HandleTxResult invoked on a receiving result from the next saga
	HandleTxResult(process *SagaProcess, id SagaTransactionID, from SagaNextID, result interface{}) SagaStatus

	// HandleTxCancel invoked on a request of transaction cancelation.
	HandleTxCancel(process *SagaProcess, id SagaTransactionID, reason string) SagaStatus

	// HandleTxDone invoked when the transaction is done on a saga where it was created.
	// It returns the final result and SagaStatus. The commit message will deliver the final
	// result to all participants of this transaction (if it has enabled the TwoPhaseCommit option).
	// Otherwise the final result will be ignored.
	HandleTxDone(process *SagaProcess, id SagaTransactionID, result interface{}) (interface{}, SagaStatus)

	// HandleTxInterim invoked if received interim result from the next hop
	HandleTxInterim(process *SagaProcess, id SagaTransactionID, from SagaNextID, interim interface{}) SagaStatus

	// HandleTxCommit invoked if TwoPhaseCommit option is enabled for the given TX.
	// All sagas involved in this TX receive a commit message with final value and invoke this callback.
	// The final result has a value returned by HandleTxDone on a Saga created this TX.
	HandleTxCommit(process *SagaProcess, id SagaTransactionID, final interface{}) SagaStatus

	// HandleJobResult
	HandleJobResult(process *SagaProcess, id SagaTransactionID, from SagaJobID, result interface{}) SagaStatus
	// HandleJobInterim
	HandleJobInterim(process *SagaProcess, id SagaTransactionID, from SagaJobID, interim interface{}) SagaStatus
	// HandleJobFailed
	HandleJobFailed(process *SagaProcess, id SagaTransactionID, from SagaJobID, reason string) SagaStatus

	// HandleStageCall this callback is invoked on ServerProcess.Call. This method is optional
	// for the implementation
	HandleSagaCall(process *SagaProcess, from ServerFrom, message etf.Term) (etf.Term, ServerStatus)
	// HandleStageCast this callback is invoked on ServerProcess.Cast. This method is optional
	// for the implementation
	HandleSagaCast(process *SagaProcess, message etf.Term) ServerStatus
	// HandleStageInfo this callback is invoked on Process.Send. This method is optional
	// for the implementation
	HandleSagaInfo(process *SagaProcess, message etf.Term) ServerStatus
	// HandleSagaDirect this callback is invoked on Process.Direct. This method is optional
	// for the implementation
	HandleSagaDirect(process *SagaProcess, ref etf.Ref, message interface{}) (interface{}, DirectStatus)
}

SagaBehavior interface

type SagaJob

type SagaJob struct {
	ID            SagaJobID
	TransactionID SagaTransactionID
	Value         interface{}
	// contains filtered or unexported fields
}

SagaJob

type SagaJobID

type SagaJobID etf.Ref

SagaJobID

func (SagaJobID) String

func (id SagaJobID) String() string

String

type SagaJobOptions

type SagaJobOptions struct {
	Timeout uint
}

SagaJobOptions

type SagaNext

type SagaNext struct {
	// Saga etf.Pid, string (for the locally registered process), gen.ProcessID{process, node} (for the remote process)
	Saga interface{}
	// Value a value for the invoking HandleTxNew on a next hop.
	Value interface{}
	// Timeout how long this Saga will be waiting for the result from the next hop. Default - 10 seconds
	Timeout uint
	// TrapCancel if the next saga fails, it will transform the cancel signal into the regular message gen.MessageSagaCancel, and HandleSagaInfo callback will be invoked.
	TrapCancel bool
	// contains filtered or unexported fields
}

SagaNext

type SagaNextID

type SagaNextID etf.Ref

SagaNextID

func (SagaNextID) String

func (id SagaNextID) String() string

String

type SagaOptions

type SagaOptions struct {
	// MaxTransactions defines the limit for the number of active transactions. Default: 0 (unlimited)
	MaxTransactions uint
	// Worker
	Worker SagaWorkerBehavior
}

SagaOptions

type SagaProcess

type SagaProcess struct {
	ServerProcess
	// contains filtered or unexported fields
}

SagaProcess

func (*SagaProcess) CancelJob

func (sp *SagaProcess) CancelJob(id SagaTransactionID, job SagaJobID, reason string) error

CancelJob

func (*SagaProcess) CancelTransaction

func (sp *SagaProcess) CancelTransaction(id SagaTransactionID, reason string) error

CancelTransaction

func (*SagaProcess) Next

func (sp *SagaProcess) Next(id SagaTransactionID, next SagaNext) (SagaNextID, error)

Next

func (*SagaProcess) SendInterim

func (sp *SagaProcess) SendInterim(id SagaTransactionID, interim interface{}) error

SendInterim

func (*SagaProcess) SendResult

func (sp *SagaProcess) SendResult(id SagaTransactionID, result interface{}) error

SendResult

func (*SagaProcess) StartJob

func (sp *SagaProcess) StartJob(id SagaTransactionID, options SagaJobOptions, value interface{}) (SagaJobID, error)

StartJob

func (*SagaProcess) StartTransaction

func (sp *SagaProcess) StartTransaction(options SagaTransactionOptions, value interface{}) SagaTransactionID

StartTransaction

type SagaStatus

type SagaStatus error

SagaStatus

type SagaTransaction

type SagaTransaction struct {
	sync.Mutex
	// contains filtered or unexported fields
}

SagaTransaction

type SagaTransactionID

type SagaTransactionID etf.Ref

SagaTransactionID

func (SagaTransactionID) String

func (id SagaTransactionID) String() string

String

type SagaTransactionOptions

type SagaTransactionOptions struct {
	// HopLimit defines a number of hop within the transaction. Default limit
	// is 0 (no limit).
	HopLimit uint
	// Lifespan defines a lifespan for the transaction in seconds. Default is 60.
	Lifespan uint

	// TwoPhaseCommit enables 2PC for the transaction. This option makes all
	// Sagas involved in this transaction invoke HandleCommit callback on them and
	// invoke HandleCommitJob callback on Worker processes once the transaction is finished.
	TwoPhaseCommit bool
}

SagaTransactionOptions

type SagaWorker

type SagaWorker struct {
	Server
}

SagaWorker

func (*SagaWorker) HandleCall

func (w *SagaWorker) HandleCall(process *ServerProcess, from ServerFrom, message etf.Term) (etf.Term, ServerStatus)

HandleCall

func (*SagaWorker) HandleCast

func (w *SagaWorker) HandleCast(process *ServerProcess, message etf.Term) ServerStatus

HandleCast

func (*SagaWorker) HandleDirect

func (w *SagaWorker) HandleDirect(process *ServerProcess, ref etf.Ref, message interface{}) (interface{}, DirectStatus)

HandleDirect

func (*SagaWorker) HandleInfo

func (w *SagaWorker) HandleInfo(process *ServerProcess, message etf.Term) ServerStatus

HandleInfo

func (*SagaWorker) HandleJobCommit

func (w *SagaWorker) HandleJobCommit(process *SagaWorkerProcess, final interface{})

HandleJobCommit

func (*SagaWorker) HandleWorkerCall

func (w *SagaWorker) HandleWorkerCall(process *SagaWorkerProcess, from ServerFrom, message etf.Term) (etf.Term, ServerStatus)

HandleWorkerCall

func (*SagaWorker) HandleWorkerCast

func (w *SagaWorker) HandleWorkerCast(process *SagaWorkerProcess, message etf.Term) ServerStatus

HandleWorkerCast

func (*SagaWorker) HandleWorkerDirect

func (w *SagaWorker) HandleWorkerDirect(process *SagaWorkerProcess, ref etf.Ref, message interface{}) (interface{}, DirectStatus)

HandleWorkerDirect

func (*SagaWorker) HandleWorkerInfo

func (w *SagaWorker) HandleWorkerInfo(process *SagaWorkerProcess, message etf.Term) ServerStatus

HandleWorkerInfo

func (*SagaWorker) HandleWorkerTerminate

func (w *SagaWorker) HandleWorkerTerminate(process *SagaWorkerProcess, reason string)

HandleWorkerTerminate

func (*SagaWorker) Init

func (w *SagaWorker) Init(process *ServerProcess, args ...etf.Term) error

Init

func (*SagaWorker) Terminate

func (w *SagaWorker) Terminate(process *ServerProcess, reason string)

Terminate

type SagaWorkerBehavior

type SagaWorkerBehavior interface {
	ServerBehavior

	// HandleJobStart invoked on a worker start
	HandleJobStart(process *SagaWorkerProcess, job SagaJob) error
	// HandleJobCancel invoked if transaction was canceled before the termination.
	HandleJobCancel(process *SagaWorkerProcess, reason string)

	// HandleJobCommit invoked if this job was a part of the transaction
	// with enabled TwoPhaseCommit option. All workers involved in this TX
	// handling are receiving this call. Callback invoked before the termination.
	HandleJobCommit(process *SagaWorkerProcess, final interface{})

	// HandleWorkerInfo this callback is invoked on Process.Send. This method is optional
	// for the implementation
	HandleWorkerInfo(process *SagaWorkerProcess, message etf.Term) ServerStatus
	// HandleWorkerCast this callback is invoked on ServerProcess.Cast. This method is optional
	// for the implementation
	HandleWorkerCast(process *SagaWorkerProcess, message etf.Term) ServerStatus
	// HandleWorkerCall this callback is invoked on ServerProcess.Call. This method is optional
	// for the implementation
	HandleWorkerCall(process *SagaWorkerProcess, from ServerFrom, message etf.Term) (etf.Term, ServerStatus)
	// HandleWorkerDirect this callback is invoked on Process.Direct. This method is optional
	// for the implementation
	HandleWorkerDirect(process *SagaWorkerProcess, ref etf.Ref, message interface{}) (interface{}, DirectStatus)

	// HandleWorkerTerminate this callback invoked on a process termination
	HandleWorkerTerminate(process *SagaWorkerProcess, reason string)
}

SagaWorkerBehavior

type SagaWorkerProcess

type SagaWorkerProcess struct {
	ServerProcess
	// contains filtered or unexported fields
}

SagaWorkerProcess

func (*SagaWorkerProcess) SendInterim

func (wp *SagaWorkerProcess) SendInterim(interim interface{}) error

SendInterim

func (*SagaWorkerProcess) SendResult

func (wp *SagaWorkerProcess) SendResult(result interface{}) error

SendResult sends the result and terminates this worker if 2PC is disabled. Otherwise, will be waiting for cancel/commit signal.

type Server

type Server struct {
	ServerBehavior
}

Server is implementation of ProcessBehavior interface for Server objects

func (*Server) HandleCall

func (gs *Server) HandleCall(process *ServerProcess, from ServerFrom, message etf.Term) (etf.Term, ServerStatus)

HandleInfo

func (*Server) HandleCast

func (gs *Server) HandleCast(process *ServerProcess, message etf.Term) ServerStatus

HanldeCast

func (*Server) HandleDirect

func (gs *Server) HandleDirect(process *ServerProcess, ref etf.Ref, message interface{}) (interface{}, DirectStatus)

HandleDirect

func (*Server) HandleInfo

func (gs *Server) HandleInfo(process *ServerProcess, message etf.Term) ServerStatus

HandleInfo

func (*Server) Init

func (gs *Server) Init(process *ServerProcess, args ...etf.Term) error

Init

func (*Server) ProcessInit

func (gs *Server) ProcessInit(p Process, args ...etf.Term) (ProcessState, error)

ProcessInit

func (*Server) ProcessLoop

func (gs *Server) ProcessLoop(ps ProcessState, started chan<- bool) string

ProcessLoop

func (*Server) Terminate

func (gs *Server) Terminate(process *ServerProcess, reason string)

Terminate

type ServerBehavior

type ServerBehavior interface {
	ProcessBehavior

	// Init invoked on a start Server
	Init(process *ServerProcess, args ...etf.Term) error

	// HandleCast invoked if Server received message sent with ServerProcess.Cast.
	// Return ServerStatusStop to stop server with "normal" reason. Use ServerStatus(error)
	// for the custom reason
	HandleCast(process *ServerProcess, message etf.Term) ServerStatus

	// HandleCall invoked if Server got sync request using ServerProcess.Call
	HandleCall(process *ServerProcess, from ServerFrom, message etf.Term) (etf.Term, ServerStatus)

	// HandleDirect invoked on a direct request made with Process.Direct
	HandleDirect(process *ServerProcess, ref etf.Ref, message interface{}) (interface{}, DirectStatus)

	// HandleInfo invoked if Server received message sent with Process.Send.
	HandleInfo(process *ServerProcess, message etf.Term) ServerStatus

	// Terminate invoked on a termination process. ServerProcess.State is not locked during
	// this callback.
	Terminate(process *ServerProcess, reason string)
}

ServerBehavior interface

type ServerFrom

type ServerFrom struct {
	Pid          etf.Pid
	Ref          etf.Ref
	ReplyByAlias bool
}

ServerFrom

type ServerProcess

type ServerProcess struct {
	ProcessState
	// contains filtered or unexported fields
}

ServerState state of the Server process.

func (*ServerProcess) Call

func (sp *ServerProcess) Call(to interface{}, message etf.Term) (etf.Term, error)

Call makes outgoing sync request in fashion of 'gen_server:call'. 'to' can be Pid, registered local name or gen.ProcessID{RegisteredName, NodeName}.

func (*ServerProcess) CallWithTimeout

func (sp *ServerProcess) CallWithTimeout(to interface{}, message etf.Term, timeout int) (etf.Term, error)

CallWithTimeout makes outgoing sync request in fashiod of 'gen_server:call' with given timeout.

func (*ServerProcess) Cast

func (sp *ServerProcess) Cast(to interface{}, message etf.Term) error

Cast sends a message in fashion of 'gen_server:cast'. 'to' can be a Pid, registered local name or gen.ProcessID{RegisteredName, NodeName}

func (*ServerProcess) CastAfter

func (sp *ServerProcess) CastAfter(to interface{}, message etf.Term, after time.Duration) CancelFunc

CastAfter a simple wrapper for Process.SendAfter to send a message in fashion of 'gen_server:cast'

func (*ServerProcess) MessageCounter

func (sp *ServerProcess) MessageCounter() uint64

MessageCounter returns the total number of messages handled by Server callbacks: HandleCall, HandleCast, HandleInfo, HandleDirect

func (*ServerProcess) Reply

func (sp *ServerProcess) Reply(ref etf.Ref, reply etf.Term, err error) error

Reply the handling process.Direct(...) calls can be done asynchronously using gen.DirectStatusIgnore as a returning status in the HandleDirect callback. In this case, you must reply manualy using gen.ServerProcess.Reply method in any other callback. If a caller has canceled this request due to timeout it returns lib.ErrReferenceUnknown

func (*ServerProcess) SendReply

func (sp *ServerProcess) SendReply(from ServerFrom, reply etf.Term) error

SendReply sends a reply message to the sender made ServerProcess.Call request. Useful for the case with dispatcher and pool of workers: Dispatcher process forwards Call requests (asynchronously) within a HandleCall callback to the worker(s) using ServerProcess.Cast or ServerProcess.Send but returns ServerStatusIgnore instead of ServerStatusOK; Worker process sends result using ServerProcess.SendReply method with 'from' value received from the Dispatcher.

type ServerStatus

type ServerStatus error

ServerStatus

func ServerStatusStopWithReason

func ServerStatusStopWithReason(s string) ServerStatus

ServerStatusStopWithReason

type Stage

type Stage struct {
	Server
}

func (*Stage) HandleCall

func (gst *Stage) HandleCall(process *ServerProcess, from ServerFrom, message etf.Term) (etf.Term, ServerStatus)

func (*Stage) HandleCancel

func (gst *Stage) HandleCancel(process *StageProcess, subscription StageSubscription, reason string) StageStatus

HandleCancel

func (*Stage) HandleCanceled

func (gst *Stage) HandleCanceled(process *StageProcess, subscription StageSubscription, reason string) StageStatus

HandleCanceled

func (*Stage) HandleCast

func (gst *Stage) HandleCast(process *ServerProcess, message etf.Term) ServerStatus

func (*Stage) HandleDemand

func (gst *Stage) HandleDemand(process *StageProcess, subscription StageSubscription, count uint) (etf.List, StageStatus)

HandleDemand

func (*Stage) HandleDirect

func (gst *Stage) HandleDirect(process *ServerProcess, ref etf.Ref, message interface{}) (interface{}, DirectStatus)

func (*Stage) HandleEvents

func (gst *Stage) HandleEvents(process *StageProcess, subscription StageSubscription, events etf.List) StageStatus

HanndleEvents

func (*Stage) HandleInfo

func (gst *Stage) HandleInfo(process *ServerProcess, message etf.Term) ServerStatus

func (*Stage) HandleStageCall

func (gst *Stage) HandleStageCall(process *StageProcess, from ServerFrom, message etf.Term) (etf.Term, ServerStatus)

HandleSagaCall

func (*Stage) HandleStageCast

func (gst *Stage) HandleStageCast(process *StageProcess, message etf.Term) ServerStatus

HandleStageCast

func (*Stage) HandleStageDirect

func (gst *Stage) HandleStageDirect(process *StageProcess, ref etf.Ref, message interface{}) (interface{}, DirectStatus)

HandleStageDirect

func (*Stage) HandleStageInfo

func (gst *Stage) HandleStageInfo(process *StageProcess, message etf.Term) ServerStatus

HandleStageInfo

func (*Stage) HandleStageTerminate

func (gst *Stage) HandleStageTerminate(process *StageProcess, reason string)

func (*Stage) HandleSubscribe

func (gst *Stage) HandleSubscribe(process *StageProcess, subscription StageSubscription, options StageSubscribeOptions) StageStatus

HandleSubscribe

func (*Stage) HandleSubscribed

func (gst *Stage) HandleSubscribed(process *StageProcess, subscription StageSubscription, opts StageSubscribeOptions) (bool, StageStatus)

HandleSubscribed

func (*Stage) Init

func (gst *Stage) Init(process *ServerProcess, args ...etf.Term) error

gen.Server callbacks

func (*Stage) InitStage

func (gst *Stage) InitStage(process *StageProcess, args ...etf.Term) error

InitStage

func (*Stage) SetCancelMode

func (s *Stage) SetCancelMode(p Process, subscription StageSubscription, cancel StageCancelMode) error

SetCancelMode defines how consumer will handle termination of the producer. There are 3 modes: StageCancelPermanent (default) - consumer exits when the producer cancels or exits StageCancelTransient - consumer exits only if reason is not normal, shutdown, or {shutdown, reason} StageCancelTemporary - never exits

func (*Stage) Terminate

func (gst *Stage) Terminate(process *ServerProcess, reason string)

type StageBehavior

type StageBehavior interface {
	ServerBehavior

	// InitStage
	InitStage(process *StageProcess, args ...etf.Term) (StageOptions, error)

	// HandleDemand this callback is invoked on a producer stage
	// The producer that implements this callback must either store the demand, or return the amount of requested events.
	HandleDemand(process *StageProcess, subscription StageSubscription, count uint) (etf.List, StageStatus)

	// HandleEvents this callback is invoked on a consumer stage.
	HandleEvents(process *StageProcess, subscription StageSubscription, events etf.List) StageStatus

	// HandleSubscribe This callback is invoked on a producer stage.
	HandleSubscribe(process *StageProcess, subscription StageSubscription, options StageSubscribeOptions) StageStatus

	// HandleSubscribed this callback is invoked as a confirmation for the subscription request
	// Returning false means that demand must be sent to producers explicitly using Ask method.
	// Returning true means the stage implementation will take care of automatically sending.
	HandleSubscribed(process *StageProcess, subscription StageSubscription, opts StageSubscribeOptions) (bool, StageStatus)

	// HandleCancel
	// Invoked when a consumer is no longer subscribed to a producer (invoked on a producer stage)
	// The cancelReason will be a {Cancel: "cancel", Reason: _} if the reason for cancellation
	// was a Stage.Cancel call. Any other value means the cancellation reason was
	// due to an EXIT.
	HandleCancel(process *StageProcess, subscription StageSubscription, reason string) StageStatus

	// HandleCanceled
	// Invoked when a consumer is no longer subscribed to a producer (invoked on a consumer stage)
	// Termination this stage depends on a cancel mode for the given subscription. For the cancel mode
	// StageCancelPermanent - this stage will be terminated right after this callback invoking.
	// For the cancel mode StageCancelTransient - it depends on a reason of subscription canceling.
	// Cancel mode StageCancelTemporary keeps this stage alive whether the reason could be.
	HandleCanceled(process *StageProcess, subscription StageSubscription, reason string) StageStatus

	// HandleStageCall this callback is invoked on ServerProcess.Call. This method is optional
	// for the implementation
	HandleStageCall(process *StageProcess, from ServerFrom, message etf.Term) (etf.Term, ServerStatus)
	// HandleStageDirect this callback is invoked on Process.Direct. This method is optional
	// for the implementation
	HandleStageDirect(process *StageProcess, ref etf.Ref, message interface{}) (interface{}, DirectStatus)
	// HandleStageCast this callback is invoked on ServerProcess.Cast. This method is optional
	// for the implementation
	HandleStageCast(process *StageProcess, message etf.Term) ServerStatus
	// HandleStageInfo this callback is invoked on Process.Send. This method is optional
	// for the implementation
	HandleStageInfo(process *StageProcess, message etf.Term) ServerStatus
	// HandleStageTerminate this callback is invoked on a termination process
	HandleStageTerminate(process *StageProcess, reason string)
}

StageBehavior interface for the Stage inmplementation

type StageCancelMode

type StageCancelMode uint
const (
	StageCancelPermanent StageCancelMode = 0
	StageCancelTransient StageCancelMode = 1
	StageCancelTemporary StageCancelMode = 2
)

type StageCancelReason

type StageCancelReason struct {
	Cancel string
	Reason string
}

type StageDispatchItem

type StageDispatchItem struct {
	// contains filtered or unexported fields
}

type StageDispatcher

type StageDispatcher int

StageDispatcher

type StageDispatcherBehavior

type StageDispatcherBehavior interface {
	// InitStageDispatcher(opts)
	Init(opts StageOptions) (state interface{})

	// Ask called every time a consumer sends demand
	Ask(state interface{}, subscription StageSubscription, count uint)

	// Cancel called every time a subscription is cancelled or the consumer goes down.
	Cancel(state interface{}, subscription StageSubscription)

	// Dispatch called every time a producer wants to dispatch an event.
	Dispatch(state interface{}, events etf.List) []StageDispatchItem

	// Subscribe called every time the producer gets a new subscriber
	Subscribe(state interface{}, subscription StageSubscription, opts StageSubscribeOptions) error
}

StageDispatcherBehavior defined interface for the dispatcher implementation. To create a custom dispatcher you should implement this interface and use it in StageOptions as a Dispatcher

func CreateStageDispatcherBroadcast

func CreateStageDispatcherBroadcast() StageDispatcherBehavior

CreateStageDispatcherBroadcast creates a dispatcher that accumulates demand from all consumers before broadcasting events to all of them. This dispatcher guarantees that events are dispatched to all consumers without exceeding the demand of any given consumer. The demand is only sent upstream once all consumers ask for data.

func CreateStageDispatcherDemand

func CreateStageDispatcherDemand() StageDispatcherBehavior

CreateStageDispatcherDemand creates a dispatcher that sends batches to the highest demand. This is the default dispatcher used by Stage. In order to avoid greedy consumers, it is recommended that all consumers have exactly the same maximum demand.

func CreateStageDispatcherPartition

func CreateStageDispatcherPartition(n uint, hash func(etf.Term) int) StageDispatcherBehavior

CreateStageDispatcherPartition creates a dispatcher that sends events according to partitions. Number of partitions 'n' must be > 0. 'hash' should return number within range [0,n). Value outside of this range is discarding event. If 'hash' is nil the random partition will be used on every event.

type StageOptions

type StageOptions struct {

	// DisableDemandHandle. the demand is always handling using the HandleDemand callback.
	// When this options is set to 'true', demands are accumulated until mode is
	// set back to 'false' using SetDemandHandle(true) method
	DisableDemandHandle bool

	// BufferSize the size of the buffer to store events without demand.
	// default value = defaultDispatcherBufferSize
	BufferSize uint

	// BufferKeepLast defines whether the first or last entries should be
	// kept on the buffer in case the buffer size is exceeded.
	BufferKeepLast bool

	Dispatcher StageDispatcherBehavior
}

StageOptions defines the producer configuration using Init callback. It will be ignored if it acts as a consumer only.

type StageProcess

type StageProcess struct {
	ServerProcess
	// contains filtered or unexported fields
}

func (*StageProcess) Ask

func (p *StageProcess) Ask(subscription StageSubscription, count uint) error

Ask makes a demand request for the given subscription. This function must only be used in the cases when a consumer sets a subscription to manual mode using DisableAutoDemand

func (*StageProcess) AutoDemand

func (p *StageProcess) AutoDemand(subscription StageSubscription) (bool, error)

AutoDemand returns value of the auto demand option

func (*StageProcess) Cancel

func (p *StageProcess) Cancel(subscription StageSubscription, reason string) error

Cancel

func (*StageProcess) CancelMode

func (p *StageProcess) CancelMode(subscription StageSubscription) (StageCancelMode, error)

CancelMode returns current cancel mode for the consumer

func (*StageProcess) DemandHandle

func (p *StageProcess) DemandHandle() bool

DemandHandle returns whether enabled handling demand requests.

func (*StageProcess) SendEvents

func (p *StageProcess) SendEvents(events etf.List) error

SendEvents sends events to the subscribers

func (*StageProcess) SetAutoDemand

func (p *StageProcess) SetAutoDemand(subscription StageSubscription, autodemand bool) error

SetAutoDemand setting this option to false means that demand must be sent to producers explicitly using Ask method. This mode can be used when a special behavior is desired. Setting this options to true enables auto demand mode (this is default mode for the consumer)

func (*StageProcess) SetCancelMode

func (p *StageProcess) SetCancelMode(subscription StageSubscription, mode StageCancelMode) error

SetCancelMode defines how consumer will handle termination of the producer. There are 3 modes: StageCancelPermanent (default) - consumer exits when the producer cancels or exits StageCancelTransient - consumer exits only if reason is not normal, shutdown, or {shutdown, reason} StageCancelTemporary - never exits

func (*StageProcess) SetDemandHandle

func (p *StageProcess) SetDemandHandle(enable bool)

SetDemandHandle setting this option to false disables handling demand requests on a producer stage. This is useful as a synchronization mechanism, where the demand is accumulated until all consumers are subscribed. By default this option is true.

func (*StageProcess) Subscribe

func (p *StageProcess) Subscribe(producer etf.Term, opts StageSubscribeOptions) (StageSubscription, error)

Subscribe subscribes to the given producer. HandleSubscribed callback will be invoked on a consumer stage once a request for the subscription is sent. If something went wrong on a producer side the callback HandleCancel will be invoked with a reason of cancelation.

type StageStatus

type StageStatus error
var (
	StageStatusOK           StageStatus = nil
	StageStatusStop         StageStatus = fmt.Errorf("stop")
	StageStatusUnsupported  StageStatus = fmt.Errorf("unsupported")
	StageStatusNotAProducer StageStatus = fmt.Errorf("not a producer")
)

type StageSubscribeOptions

type StageSubscribeOptions struct {
	MinDemand uint `etf:"min_demand"`
	MaxDemand uint `etf:"max_demand"`
	// The stage implementation will take care of automatically sending
	// demand to producer (as a default behavior). You can disable it
	// setting ManualDemand to true
	ManualDemand bool `etf:"manual"`
	// What should happened with consumer if producer has terminated
	// StageCancelPermanent the consumer exits when the producer cancels or exits.
	// StageCancelTransient the consumer exits only if reason is not "normal",
	// "shutdown", or {"shutdown", _}
	// StageCancelTemporary the consumer never exits
	Cancel StageCancelMode `etf:"cancel"`

	// Partition is defined the number of partition this subscription should belongs to.
	// This option uses in the DispatcherPartition
	Partition uint `etf:"partition"`

	// Extra is intended to be a custom set of options for the custom implementation
	// of StageDispatcherBehavior
	Extra etf.Term `etf:"extra"`
}

type StageSubscription

type StageSubscription struct {
	Pid etf.Pid
	ID  etf.Ref
}

type Supervisor

type Supervisor struct{}

Supervisor is implementation of ProcessBehavior interface

func (*Supervisor) ProcessInit

func (sv *Supervisor) ProcessInit(p Process, args ...etf.Term) (ProcessState, error)

ProcessInit

func (*Supervisor) ProcessLoop

func (sv *Supervisor) ProcessLoop(ps ProcessState, started chan<- bool) string

ProcessLoop

func (*Supervisor) StartChild

func (sv *Supervisor) StartChild(supervisor Process, name string, args ...etf.Term) (Process, error)

StartChild dynamically starts a child process with given name of child spec which is defined by Init call.

type SupervisorBehavior

type SupervisorBehavior interface {
	ProcessBehavior
	Init(args ...etf.Term) (SupervisorSpec, error)
}

SupervisorBehavior interface

type SupervisorChildSpec

type SupervisorChildSpec struct {
	Name    string
	Child   ProcessBehavior
	Options ProcessOptions
	Args    []etf.Term
	// contains filtered or unexported fields
}

SupervisorChildSpec

type SupervisorSpec

type SupervisorSpec struct {
	Name     string
	Children []SupervisorChildSpec
	Strategy SupervisorStrategy
	// contains filtered or unexported fields
}

SupervisorSpec

type SupervisorStrategy

type SupervisorStrategy struct {
	Type      SupervisorStrategyType
	Intensity uint16
	Period    uint16
	Restart   SupervisorStrategyRestart
}

SupervisorStrategy

type SupervisorStrategyRestart

type SupervisorStrategyRestart = string

SupervisorStrategyRestart

type SupervisorStrategyType

type SupervisorStrategyType = string

SupervisorStrategyType

type TCP

type TCP struct {
	Server
}

func (*TCP) HandleCall

func (tcp *TCP) HandleCall(process *ServerProcess, from ServerFrom, message etf.Term) (etf.Term, ServerStatus)

func (*TCP) HandleCast

func (tcp *TCP) HandleCast(process *ServerProcess, message etf.Term) ServerStatus

func (*TCP) HandleInfo

func (tcp *TCP) HandleInfo(process *ServerProcess, message etf.Term) ServerStatus

func (*TCP) HandleTCPCall

func (tcp *TCP) HandleTCPCall(process *TCPProcess, from ServerFrom, message etf.Term) (etf.Term, ServerStatus)

HandleTCPCall

func (*TCP) HandleTCPCast

func (tcp *TCP) HandleTCPCast(process *TCPProcess, message etf.Term) ServerStatus

HandleTCPCast

func (*TCP) HandleTCPInfo

func (tcp *TCP) HandleTCPInfo(process *TCPProcess, message etf.Term) ServerStatus

HandleTCPInfo

func (*TCP) HandleTCPTerminate

func (tcp *TCP) HandleTCPTerminate(process *TCPProcess, reason string)

func (*TCP) Init

func (tcp *TCP) Init(process *ServerProcess, args ...etf.Term) error

Server callbacks

func (*TCP) Terminate

func (tcp *TCP) Terminate(process *ServerProcess, reason string)

type TCPBehavior

type TCPBehavior interface {
	ServerBehavior

	InitTCP(process *TCPProcess, args ...etf.Term) (TCPOptions, error)

	HandleTCPCall(process *TCPProcess, from ServerFrom, message etf.Term) (etf.Term, ServerStatus)
	HandleTCPCast(process *TCPProcess, message etf.Term) ServerStatus
	HandleTCPInfo(process *TCPProcess, message etf.Term) ServerStatus

	HandleTCPTerminate(process *TCPProcess, reason string)
}

type TCPConnection

type TCPConnection struct {
	Addr   net.Addr
	Socket io.Writer
	State  interface{}
}

type TCPHandler

type TCPHandler struct {
	Server
	// contains filtered or unexported fields
}

func (*TCPHandler) HandleCall

func (tcph *TCPHandler) HandleCall(process *ServerProcess, from ServerFrom, message etf.Term) (etf.Term, ServerStatus)

func (*TCPHandler) HandleCast

func (tcph *TCPHandler) HandleCast(process *ServerProcess, message etf.Term) ServerStatus

func (*TCPHandler) HandleConnect

func (tcph *TCPHandler) HandleConnect(process *TCPHandlerProcess, conn *TCPConnection) TCPHandlerStatus

func (*TCPHandler) HandleDirect

func (tcph *TCPHandler) HandleDirect(process *ServerProcess, ref etf.Ref, message interface{}) (interface{}, DirectStatus)

func (*TCPHandler) HandleDisconnect

func (tcph *TCPHandler) HandleDisconnect(process *TCPHandlerProcess, conn *TCPConnection)

func (*TCPHandler) HandleInfo

func (tcph *TCPHandler) HandleInfo(process *ServerProcess, message etf.Term) ServerStatus

func (*TCPHandler) HandleTCPHandlerCall

func (tcph *TCPHandler) HandleTCPHandlerCall(process *TCPHandlerProcess, from ServerFrom, message etf.Term) (etf.Term, ServerStatus)

HandleTCPHandlerCall

func (*TCPHandler) HandleTCPHandlerCast

func (tcph *TCPHandler) HandleTCPHandlerCast(process *TCPHandlerProcess, message etf.Term) ServerStatus

HandleTCPHandlerCast

func (*TCPHandler) HandleTCPHandlerInfo

func (tcph *TCPHandler) HandleTCPHandlerInfo(process *TCPHandlerProcess, message etf.Term) ServerStatus

HandleTCPHandlerInfo

func (*TCPHandler) HandleTCPHandlerTerminate

func (tcph *TCPHandler) HandleTCPHandlerTerminate(process *TCPHandlerProcess, reason string)

func (*TCPHandler) HandleTimeout

func (tcph *TCPHandler) HandleTimeout(process *TCPHandlerProcess, conn *TCPConnection) TCPHandlerStatus

func (*TCPHandler) Init

func (tcph *TCPHandler) Init(process *ServerProcess, args ...etf.Term) error

func (*TCPHandler) Terminate

func (tcph *TCPHandler) Terminate(process *ServerProcess, reason string)

type TCPHandlerBehavior

type TCPHandlerBehavior interface {
	ServerBehavior

	// Mandatory callback
	HandlePacket(process *TCPHandlerProcess, packet []byte, conn *TCPConnection) (int, int, TCPHandlerStatus)

	// Optional callbacks
	HandleConnect(process *TCPHandlerProcess, conn *TCPConnection) TCPHandlerStatus
	HandleDisconnect(process *TCPHandlerProcess, conn *TCPConnection)
	HandleTimeout(process *TCPHandlerProcess, conn *TCPConnection) TCPHandlerStatus

	HandleTCPHandlerCall(process *TCPHandlerProcess, from ServerFrom, message etf.Term) (etf.Term, ServerStatus)
	HandleTCPHandlerCast(process *TCPHandlerProcess, message etf.Term) ServerStatus
	HandleTCPHandlerInfo(process *TCPHandlerProcess, message etf.Term) ServerStatus
	HandleTCPHandlerTerminate(process *TCPHandlerProcess, reason string)
}

type TCPHandlerProcess

type TCPHandlerProcess struct {
	ServerProcess
	// contains filtered or unexported fields
}

func (*TCPHandlerProcess) SetTrapExit

func (tcpp *TCPHandlerProcess) SetTrapExit(trap bool)

we should disable SetTrapExit for the TCPHandlerProcess by overriding it.

type TCPHandlerStatus

type TCPHandlerStatus error
var (
	TCPHandlerStatusOK    TCPHandlerStatus = nil
	TCPHandlerStatusClose TCPHandlerStatus = fmt.Errorf("close")
)

type TCPOptions

type TCPOptions struct {
	Host            string
	Port            uint16
	TLS             *tls.Config
	KeepAlivePeriod int
	Handler         TCPHandlerBehavior
	// QueueLength defines how many parallel requests can be directed to this process. Default value is 10.
	QueueLength int
	// NumHandlers defines how many handlers will be started. Default 1
	NumHandlers int
	// IdleTimeout defines how long (in seconds) keeps the started handler alive with no packets. Zero value makes the handler non-stop.
	IdleTimeout     int
	DeadlineTimeout int
	MaxPacketSize   int
	// ExtraHandlers enables starting new handlers if all handlers in the pool are busy.
	ExtraHandlers bool
}

type TCPProcess

type TCPProcess struct {
	ServerProcess
	// contains filtered or unexported fields
}

type TCPStatus

type TCPStatus error
var (
	TCPStatusOK   TCPStatus
	TCPStatusStop TCPStatus = fmt.Errorf("stop")
)

type UDP

type UDP struct {
	Server
}

func (*UDP) HandleCall

func (udp *UDP) HandleCall(process *ServerProcess, from ServerFrom, message etf.Term) (etf.Term, ServerStatus)

func (*UDP) HandleCast

func (udp *UDP) HandleCast(process *ServerProcess, message etf.Term) ServerStatus

func (*UDP) HandleInfo

func (udp *UDP) HandleInfo(process *ServerProcess, message etf.Term) ServerStatus

func (*UDP) HandleUDPCall

func (udp *UDP) HandleUDPCall(process *UDPProcess, from ServerFrom, message etf.Term) (etf.Term, ServerStatus)

HandleUDPCall

func (*UDP) HandleUDPCast

func (udp *UDP) HandleUDPCast(process *UDPProcess, message etf.Term) ServerStatus

HandleUDPCast

func (*UDP) HandleUDPInfo

func (udp *UDP) HandleUDPInfo(process *UDPProcess, message etf.Term) ServerStatus

HandleUDPInfo

func (*UDP) HandleUDPTerminate

func (udp *UDP) HandleUDPTerminate(process *UDPProcess, reason string)

func (*UDP) Init

func (udp *UDP) Init(process *ServerProcess, args ...etf.Term) error

Server callbacks

func (*UDP) Terminate

func (udp *UDP) Terminate(process *ServerProcess, reason string)

type UDPBehavior

type UDPBehavior interface {
	ServerBehavior

	InitUDP(process *UDPProcess, args ...etf.Term) (UDPOptions, error)

	HandleUDPCall(process *UDPProcess, from ServerFrom, message etf.Term) (etf.Term, ServerStatus)
	HandleUDPCast(process *UDPProcess, message etf.Term) ServerStatus
	HandleUDPInfo(process *UDPProcess, message etf.Term) ServerStatus

	HandleUDPTerminate(process *UDPProcess, reason string)
}

type UDPHandler

type UDPHandler struct {
	Server
}

func (*UDPHandler) HandleCall

func (udph *UDPHandler) HandleCall(process *ServerProcess, from ServerFrom, message etf.Term) (etf.Term, ServerStatus)

func (*UDPHandler) HandleCast

func (udph *UDPHandler) HandleCast(process *ServerProcess, message etf.Term) ServerStatus

func (*UDPHandler) HandleInfo

func (udph *UDPHandler) HandleInfo(process *ServerProcess, message etf.Term) ServerStatus

func (*UDPHandler) HandleTimeout

func (udph *UDPHandler) HandleTimeout(process *UDPHandlerProcess)

func (*UDPHandler) HandleUDPHandlerCall

func (udph *UDPHandler) HandleUDPHandlerCall(process *UDPHandlerProcess, from ServerFrom, message etf.Term) (etf.Term, ServerStatus)

HandleUDPHandlerCall

func (*UDPHandler) HandleUDPHandlerCast

func (udph *UDPHandler) HandleUDPHandlerCast(process *UDPHandlerProcess, message etf.Term) ServerStatus

HandleUDPHandlerCast

func (*UDPHandler) HandleUDPHandlerInfo

func (udph *UDPHandler) HandleUDPHandlerInfo(process *UDPHandlerProcess, message etf.Term) ServerStatus

HandleUDPHandlerInfo

func (*UDPHandler) HandleUDPHandlerTerminate

func (udph *UDPHandler) HandleUDPHandlerTerminate(process *UDPHandlerProcess, reason string)

func (*UDPHandler) Init

func (udph *UDPHandler) Init(process *ServerProcess, args ...etf.Term) error

func (*UDPHandler) Terminate

func (udph *UDPHandler) Terminate(process *ServerProcess, reason string)

type UDPHandlerBehavior

type UDPHandlerBehavior interface {
	ServerBehavior

	// Mandatory callback
	HandlePacket(process *UDPHandlerProcess, data []byte, packet UDPPacket)

	// Optional callbacks
	HandleTimeout(process *UDPHandlerProcess)

	HandleUDPHandlerCall(process *UDPHandlerProcess, from ServerFrom, message etf.Term) (etf.Term, ServerStatus)
	HandleUDPHandlerCast(process *UDPHandlerProcess, message etf.Term) ServerStatus
	HandleUDPHandlerInfo(process *UDPHandlerProcess, message etf.Term) ServerStatus
	HandleUDPHandlerTerminate(process *UDPHandlerProcess, reason string)
}

type UDPHandlerProcess

type UDPHandlerProcess struct {
	ServerProcess
	// contains filtered or unexported fields
}

func (*UDPHandlerProcess) SetTrapExit

func (udpp *UDPHandlerProcess) SetTrapExit(trap bool)

we should disable SetTrapExit for the UDPHandlerProcess by overriding it.

type UDPOptions

type UDPOptions struct {
	Host string
	Port uint16

	Handler         UDPHandlerBehavior
	NumHandlers     int
	IdleTimeout     int
	DeadlineTimeout int
	QueueLength     int
	MaxPacketSize   int
	ExtraHandlers   bool
}

type UDPPacket

type UDPPacket struct {
	Addr   net.Addr
	Socket io.Writer
}

type UDPProcess

type UDPProcess struct {
	ServerProcess
	// contains filtered or unexported fields
}

type UDPStatus

type UDPStatus error
var (
	UDPStatusOK   UDPStatus
	UDPStatusStop UDPStatus = fmt.Errorf("stop")
)

type Web

type Web struct {
	Server
}

func (*Web) HandleCall

func (web *Web) HandleCall(process *ServerProcess, from ServerFrom, message etf.Term) (etf.Term, ServerStatus)

HandleCall

func (*Web) HandleCast

func (web *Web) HandleCast(process *ServerProcess, message etf.Term) ServerStatus

HandleCast

func (*Web) HandleDirect

func (web *Web) HandleDirect(process *ServerProcess, ref etf.Ref, message interface{}) (interface{}, DirectStatus)

HandleDirect

func (*Web) HandleInfo

func (web *Web) HandleInfo(process *ServerProcess, message etf.Term) ServerStatus

HandleInfo

func (*Web) HandleWebCall

func (web *Web) HandleWebCall(process *WebProcess, from ServerFrom, message etf.Term) (etf.Term, ServerStatus)

HandleWebCall

func (*Web) HandleWebCast

func (web *Web) HandleWebCast(process *WebProcess, message etf.Term) ServerStatus

HandleWebCast

func (*Web) HandleWebInfo

func (web *Web) HandleWebInfo(process *WebProcess, message etf.Term) ServerStatus

HandleWebInfo

func (*Web) Init

func (web *Web) Init(process *ServerProcess, args ...etf.Term) error

func (*Web) Terminate

func (web *Web) Terminate(process *ServerProcess, reason string)

type WebBehavior

type WebBehavior interface {
	ServerBehavior
	// mandatory method
	InitWeb(process *WebProcess, args ...etf.Term) (WebOptions, error)

	// optional methods
	HandleWebCall(process *WebProcess, from ServerFrom, message etf.Term) (etf.Term, ServerStatus)
	HandleWebCast(process *WebProcess, message etf.Term) ServerStatus
	HandleWebInfo(process *WebProcess, message etf.Term) ServerStatus
}

type WebHandler

type WebHandler struct {
	Server
	// contains filtered or unexported fields
}

func (*WebHandler) HandleCall

func (wh *WebHandler) HandleCall(process *ServerProcess, from ServerFrom, message etf.Term) (etf.Term, ServerStatus)

func (*WebHandler) HandleCast

func (wh *WebHandler) HandleCast(process *ServerProcess, message etf.Term) ServerStatus

func (*WebHandler) HandleDirect

func (wh *WebHandler) HandleDirect(process *ServerProcess, ref etf.Ref, message interface{}) (interface{}, DirectStatus)

func (*WebHandler) HandleInfo

func (wh *WebHandler) HandleInfo(process *ServerProcess, message etf.Term) ServerStatus

func (*WebHandler) HandleWebHandlerCall

func (wh *WebHandler) HandleWebHandlerCall(process *WebHandlerProcess, from ServerFrom, message etf.Term) (etf.Term, ServerStatus)

HandleWebHandlerCall

func (*WebHandler) HandleWebHandlerCast

func (wh *WebHandler) HandleWebHandlerCast(process *WebHandlerProcess, message etf.Term) ServerStatus

HandleWebHandlerCast

func (*WebHandler) HandleWebHandlerInfo

func (wh *WebHandler) HandleWebHandlerInfo(process *WebHandlerProcess, message etf.Term) ServerStatus

HandleWebHandlerInfo

func (*WebHandler) HandleWebHandlerTerminate

func (wh *WebHandler) HandleWebHandlerTerminate(process *WebHandlerProcess, reason string, count int64)

func (*WebHandler) Init

func (wh *WebHandler) Init(process *ServerProcess, args ...etf.Term) error

func (*WebHandler) ServeHTTP

func (wh *WebHandler) ServeHTTP(w http.ResponseWriter, r *http.Request)

func (*WebHandler) Terminate

func (wh *WebHandler) Terminate(process *ServerProcess, reason string)

type WebHandlerBehavior

type WebHandlerBehavior interface {
	ServerBehavior

	// Mandatory callback
	HandleRequest(process *WebHandlerProcess, request WebMessageRequest) WebHandlerStatus

	// Optional callbacks
	HandleWebHandlerCall(process *WebHandlerProcess, from ServerFrom, message etf.Term) (etf.Term, ServerStatus)
	HandleWebHandlerCast(process *WebHandlerProcess, message etf.Term) ServerStatus
	HandleWebHandlerInfo(process *WebHandlerProcess, message etf.Term) ServerStatus
	HandleWebHandlerTerminate(process *WebHandlerProcess, reason string, count int64)
	// contains filtered or unexported methods
}

type WebHandlerOptions

type WebHandlerOptions struct {
	// Timeout for web-requests. The default timeout is 5 seconds. It can also be
	// overridden within HTTP requests using the header 'Request-Timeout'
	RequestTimeout int
	// RequestQueueLength defines how many parallel requests can be directed to this process. Default value is 10.
	RequestQueueLength int
	// NumHandlers defines how many handlers will be started. Default 1
	NumHandlers int
	// IdleTimeout defines how long (in seconds) keep the started handler alive with no requests. Zero value makes handler not stop.
	IdleTimeout int
}

type WebHandlerProcess

type WebHandlerProcess struct {
	ServerProcess
	// contains filtered or unexported fields
}

func (*WebHandlerProcess) SetTrapExit

func (whp *WebHandlerProcess) SetTrapExit(trap bool)

we should disable SetTrapExit for the WebHandlerProcess by overriding it.

type WebHandlerStatus

type WebHandlerStatus error
var (
	WebHandlerStatusDone WebHandlerStatus = nil
	WebHandlerStatusWait WebHandlerStatus = fmt.Errorf("wait")
)

type WebMessageRequest

type WebMessageRequest struct {
	Ref      etf.Ref
	Request  *http.Request
	Response http.ResponseWriter
}

type WebOptions

type WebOptions struct {
	Host    string
	Port    uint16 // default port 8080, for TLS - 8443
	TLS     *tls.Config
	Handler http.Handler
}

type WebProcess

type WebProcess struct {
	ServerProcess
	// contains filtered or unexported fields
}

func (*WebProcess) StartWebHandler

func (wp *WebProcess) StartWebHandler(web WebHandlerBehavior, options WebHandlerOptions) http.Handler

type WebStatus

type WebStatus error
var (
	WebStatusOK   WebStatus // nil
	WebStatusStop WebStatus = fmt.Errorf("stop")
)

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL