gen

package
v1.999.200 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Oct 14, 2021 License: MIT Imports: 9 Imported by: 40

README

Generic behaviors

Server

Generic server behavior.

Supervisor

Generic supervisor behavior.

Application

Generic application behavior.

Stage

Generic stage behavior.

Saga

Generic saga behavior.

Documentation

Index

Constants

View Source
const (

	// ApplicationStartPermanent If a permanent application terminates,
	// all other applications and the runtime system (node) are also terminated.
	ApplicationStartPermanent = "permanent"

	// ApplicationStartTemporary If a temporary application terminates,
	// this is reported but no other applications are terminated.
	ApplicationStartTemporary = "temporary"

	// ApplicationStartTransient If a transient application terminates
	// with reason normal, this is reported but no other applications are
	// terminated. If a transient application terminates abnormally, that
	// is with any other reason than normal, all other applications and
	// the runtime system (node) are also terminated.
	ApplicationStartTransient = "transient"
)
View Source
const (

	// SupervisorRestartIntensity
	SupervisorRestartIntensity = uint16(10)

	// SupervisorRestartPeriod
	SupervisorRestartPeriod = uint16(10)

	// SupervisorStrategyOneForOne If one child process terminates and is to be restarted, only
	// that child process is affected. This is the default restart strategy.
	SupervisorStrategyOneForOne = SupervisorStrategyType("one_for_one")

	// SupervisorStrategyOneForAll If one child process terminates and is to be restarted, all other
	// child processes are terminated and then all child processes are restarted.
	SupervisorStrategyOneForAll = SupervisorStrategyType("one_for_all")

	// SupervisorStrategyRestForOne If one child process terminates and is to be restarted,
	// the 'rest' of the child processes (that is, the child
	// processes after the terminated child process in the start order)
	// are terminated. Then the terminated child process and all
	// child processes after it are restarted
	SupervisorStrategyRestForOne = SupervisorStrategyType("rest_for_one")

	// SupervisorStrategySimpleOneForOne A simplified one_for_one supervisor, where all
	// child processes are dynamically added instances
	// of the same process type, that is, running the same code.
	SupervisorStrategySimpleOneForOne = SupervisorStrategyType("simple_one_for_one")

	// SupervisorStrategyRestartPermanent child process is always restarted
	SupervisorStrategyRestartPermanent = SupervisorStrategyRestart("permanent")

	// SupervisorStrategyRestartTemporary child process is never restarted
	// (not even when the supervisor restart strategy is rest_for_one
	// or one_for_all and a sibling death causes the temporary process
	// to be terminated)
	SupervisorStrategyRestartTemporary = SupervisorStrategyRestart("temporary")

	// SupervisorStrategyRestartTransient child process is restarted only if
	// it terminates abnormally, that is, with an exit reason other
	// than normal, shutdown.
	SupervisorStrategyRestartTransient = SupervisorStrategyRestart("transient")
)
View Source
const (
	DefaultCallTimeout = 5
)

Variables

View Source
var (
	SagaStatusOK   SagaStatus // nil
	SagaStatusStop SagaStatus = fmt.Errorf("stop")

	ErrSagaTxEndOfLifespan   = fmt.Errorf("End of TX lifespan")
	ErrSagaTxNextTimeout     = fmt.Errorf("Next saga timeout")
	ErrSagaUnknown           = fmt.Errorf("Unknown saga")
	ErrSagaJobUnknown        = fmt.Errorf("Unknown job")
	ErrSagaTxUnknown         = fmt.Errorf("Unknown TX")
	ErrSagaTxCanceled        = fmt.Errorf("Tx is canceled")
	ErrSagaTxInProgress      = fmt.Errorf("Tx is still in progress")
	ErrSagaResultAlreadySent = fmt.Errorf("Result is already sent")
	ErrSagaNotAllowed        = fmt.Errorf("Operation is not allowed")
)
View Source
var (
	ErrUnsupportedRequest = fmt.Errorf("Unsupported request")
	ErrServerTerminated   = fmt.Errorf("Server terminated")
)

Functions

This section is empty.

Types

type Application

type Application struct{}

Application is implementation of ProcessBehavior interface

func (*Application) ProcessInit

func (a *Application) ProcessInit(p Process, args ...etf.Term) (ProcessState, error)

func (*Application) ProcessLoop

func (a *Application) ProcessLoop(ps ProcessState, started chan<- bool) string

type ApplicationBehavior

type ApplicationBehavior interface {
	ProcessBehavior
	Load(args ...etf.Term) (ApplicationSpec, error)
	Start(process Process, args ...etf.Term)
}

ApplicationBehavior interface

type ApplicationChildSpec

type ApplicationChildSpec struct {
	Child ProcessBehavior
	Name  string
	Args  []etf.Term
	// contains filtered or unexported fields
}

type ApplicationInfo

type ApplicationInfo struct {
	Name        string
	Description string
	Version     string
	PID         etf.Pid
}

type ApplicationSpec

type ApplicationSpec struct {
	sync.Mutex
	Name         string
	Description  string
	Version      string
	Lifespan     time.Duration
	Applications []string
	Environment  map[string]interface{}
	Children     []ApplicationChildSpec
	Process      Process
	StartType    ApplicationStartType
}

type ApplicationStartType

type ApplicationStartType = string

type MessageDirectChildren

type MessageDirectChildren struct{}

type MessageDown

type MessageDown struct {
	Ref       etf.Ref   // a monitor reference
	ProcessID ProcessID // if monitor was created by name
	Pid       etf.Pid
	Reason    string
}

MessageDown delivers as a message to Server's HandleInfo callback of the process that created monitor using MonitorProcess. Reason values:

  • the exit reason of the process
  • 'noproc' (process did not exist at the time of monitor creation)
  • 'noconnection' (no connection to the node where the monitored process resides)

func IsMessageDown

func IsMessageDown(message etf.Term) (MessageDown, bool)

type MessageExit

type MessageExit struct {
	Pid    etf.Pid
	Reason string
}

MessageExit delievers to Server's HandleInfo callback on enabled trap exit using SetTrapExit(true)

func IsMessageExit

func IsMessageExit(message etf.Term) (MessageExit, bool)

type MessageManageRPC

type MessageManageRPC struct {
	Provide  bool
	Module   string
	Function string
	Fun      RPC
}

MessageManageRPC is using to manage RPC feature provides by "rex" process

type MessageNodeDown

type MessageNodeDown struct {
	Name string
}

MessageNodeDown delivers as a message to Server's HandleInfo callback of the process that created monitor using MonitorNode

type MessageSagaCancel

type MessageSagaCancel struct {
	TransactionID SagaTransactionID
	NextID        SagaNextID
	Reason        string
}

type MessageSagaError

type MessageSagaError struct {
	TransactionID SagaTransactionID
	NextID        SagaNextID
	Error         string
	Details       string
}

type Monitor added in v1.999.200

type Monitor interface {
	IsMonitor(ref etf.Ref) bool
}

type Process

type Process interface {
	Registrar

	// Spawn create a new process with parent
	Spawn(name string, opts ProcessOptions, object ProcessBehavior, args ...etf.Term) (Process, error)
	// RemoteSpawn creates a new process at a remote node. The object name is a regitered behavior on a remote name using RegisterBehavior(...). Init callback of the started remote process will receive gen.RemoteSpawnRequest as an argument.
	RemoteSpawn(node string, object string, opts RemoteSpawnOptions, args ...etf.Term) (etf.Pid, error)
	// Name returns process name used on starting.
	Name() string

	// RegisterName register associates the name with pid (not overrides registered name on starting)
	RegisterName(name string) error

	// UnregisterName unregister named process. Unregistering name is allowed to the owner only
	UnregisterName(name string) error

	// Info returns process details
	Info() ProcessInfo

	// Self returns registered process identificator belongs to the process
	Self() etf.Pid

	// Direct make a direct request to the actor (gen.Application, gen.Supervisor, gen.Server or
	// inherited from gen.Server actor) with default timeout 5 seconds
	Direct(request interface{}) (interface{}, error)

	// DirectWithTimeout make a direct request to the actor with the given timeout (in seconds)
	DirectWithTimeout(request interface{}, timeout int) (interface{}, error)

	// Send sends a message in fashion of 'erlang:send'. The value of 'to' can be a Pid, registered local name
	// or gen.ProcessID{RegisteredName, NodeName}
	Send(to interface{}, message etf.Term) error

	// SendAfter starts a timer. When the timer expires, the message sends to the process
	// identified by 'to'.  'to' can be a Pid, registered local name or
	// gen.ProcessID{RegisteredName, NodeName}. Returns cancel function in order to discard
	// sending a message
	SendAfter(to interface{}, message etf.Term, after time.Duration) context.CancelFunc

	// Exit initiate a graceful stopping process
	Exit(reason string) error

	// Kill immidiately stops process
	Kill()

	// CreateAlias creates a new alias for the Process
	CreateAlias() (etf.Alias, error)

	// DeleteAlias deletes the given alias
	DeleteAlias(alias etf.Alias) error

	// ListEnv returns a map of configured environment variables.
	// It also includes environment variables from the GroupLeader and Parent.
	// which are overlapped by priority: Process(Parent(GroupLeader))
	ListEnv() map[string]interface{}

	// SetEnv set environment variable with given name. Use nil value to remove variable with given name.
	SetEnv(name string, value interface{})

	// Env returns value associated with given environment name.
	Env(name string) interface{}

	// Wait waits until process stopped
	Wait()

	// WaitWithTimeout waits until process stopped. Return ErrTimeout
	// if given timeout is exceeded
	WaitWithTimeout(d time.Duration) error

	// Link creates a link between the calling process and another process.
	// Links are bidirectional and there can only be one link between two processes.
	// Repeated calls to Process.Link(Pid) have no effect. If one of the participants
	// of a link terminates, it will send an exit signal to the other participant and caused
	// termination of the last one (if this process hasn't set a trap using Process.SetTrapExit(true)).
	Link(with etf.Pid)

	// Unlink removes the link, if there is one, between the calling process and
	// the process referred to by Pid.
	Unlink(with etf.Pid)

	// IsAlive returns whether the process is alive
	IsAlive() bool

	// SetTrapExit enables/disables the trap on terminate process. When a process is trapping exits,
	// it will not terminate when an exit signal is received. Instead, the signal is transformed
	// into a 'gen.MessageExit' which is put into the mailbox of the process just like a regular message.
	SetTrapExit(trap bool)

	// TrapExit returns whether the trap was enabled on this process
	TrapExit() bool

	// MonitorNode creates monitor between the current process and node. If Node fails or does not exist,
	// the message {nodedown, Node} is delivered to the process.
	MonitorNode(name string) etf.Ref

	// DemonitorNode removes monitor. Returns false if the given reference wasn't found
	DemonitorNode(ref etf.Ref) bool

	// MonitorProcess creates monitor between the processes.
	// Allowed types for the 'process' value: etf.Pid, gen.ProcessID
	// When a process monitor is triggered, a MessageDown sends to the caller.
	// Note: The monitor request is an asynchronous signal. That is, it takes
	// time before the signal reaches its destination.
	MonitorProcess(process interface{}) etf.Ref

	// DemonitorProcess removes monitor. Returns false if the given reference wasn't found
	DemonitorProcess(ref etf.Ref) bool

	// Behavior returns the object this process runs on.
	Behavior() ProcessBehavior
	// GroupLeader returns group leader process. Usually it points to the application process.
	GroupLeader() Process
	// Parent returns parent process. It returns nil if this process was spawned using Node.Spawn.
	Parent() Process
	// Context returns process context.
	Context() context.Context

	// Children returns list of children pid (Application, Supervisor)
	Children() ([]etf.Pid, error)

	// Links returns list of the process pids this process has linked to.
	Links() []etf.Pid
	// Monitors returns list of monitors created this process by pid.
	Monitors() []etf.Pid
	// Monitors returns list of monitors created this process by name.
	MonitorsByName() []ProcessID
	// MonitoredBy returns list of process pids monitored this process.
	MonitoredBy() []etf.Pid
	// Aliases returns list of aliases of this process.
	Aliases() []etf.Alias

	SendSyncRequestRaw(ref etf.Ref, node etf.Atom, messages ...etf.Term) error
	PutSyncReply(ref etf.Ref, term etf.Term) error
	SendSyncRequest(ref etf.Ref, to interface{}, message etf.Term) error
	WaitSyncReply(ref etf.Ref, timeout int) (etf.Term, error)
	ProcessChannels() ProcessChannels
}

type ProcessBehavior

type ProcessBehavior interface {
	ProcessInit(Process, ...etf.Term) (ProcessState, error)
	ProcessLoop(ProcessState, chan<- bool) string // method which implements control flow of process
}

ProcessBehavior interface contains methods you should implement to make own process behaviour

type ProcessChannels

type ProcessChannels struct {
	Mailbox      <-chan ProcessMailboxMessage
	Direct       <-chan ProcessDirectMessage
	GracefulExit <-chan ProcessGracefulExitRequest
}

type ProcessDirectMessage

type ProcessDirectMessage struct {
	Message interface{}
	Err     error
	Reply   chan ProcessDirectMessage
}

type ProcessGracefulExitRequest

type ProcessGracefulExitRequest struct {
	From   etf.Pid
	Reason string
}

type ProcessID

type ProcessID struct {
	Name string
	Node string
}

ProcessID long notation of registered process {process_name, node_name}

type ProcessInfo

type ProcessInfo struct {
	PID             etf.Pid
	Name            string
	CurrentFunction string
	Status          string
	MessageQueueLen int
	Links           []etf.Pid
	Monitors        []etf.Pid
	MonitorsByName  []ProcessID
	MonitoredBy     []etf.Pid
	Aliases         []etf.Alias
	Dictionary      etf.Map
	TrapExit        bool
	GroupLeader     etf.Pid
	Reductions      uint64
}

ProcessInfo struct with process details

type ProcessMailboxMessage

type ProcessMailboxMessage struct {
	From    etf.Pid
	Message interface{}
}

type ProcessOptions

type ProcessOptions struct {
	// Context allows mix the system context with the custom one. E.g. to limit
	// the lifespan using context.WithTimeout
	Context context.Context
	// MailboxSize defines the lenght of message queue for the process
	MailboxSize uint16
	// GroupLeader
	GroupLeader Process
	// Env set the process environment variables
	Env map[string]interface{}
}

type ProcessState

type ProcessState struct {
	Process
	State interface{}
}

type RPC

type RPC func(...etf.Term) etf.Term

RPC defines rpc function type

type RegisteredBehavior

type RegisteredBehavior struct {
	Behavior ProcessBehavior
	Data     interface{}
}

type Registrar added in v1.999.200

type Registrar interface {
	Monitor

	NodeName() string
	NodeStop()

	// ProcessByName returns Process struct for the given name.
	// Returns nil if it doesn't exist (not found)
	ProcessByName(name string) Process
	// ProcessByPid returns Process struct for the given Pid.
	// Returns nil if it doesn't exist (not found)
	ProcessByPid(pid etf.Pid) Process

	// ProcessByAlias returns Process struct for the given alias.
	// Returns nil if it doesn't exist (not found)
	ProcessByAlias(alias etf.Alias) Process

	// ProcessInfo returns the details about given Pid
	ProcessInfo(pid etf.Pid) (ProcessInfo, error)
	ProcessList() []Process
	IsAlias(etf.Alias) bool
	MakeRef() etf.Ref

	// IsProcessAlive returns true if the process with given pid is alive
	IsProcessAlive(process Process) bool

	RegisterBehavior(group, name string, behavior ProcessBehavior, data interface{}) error
	RegisteredBehavior(group, name string) (RegisteredBehavior, error)
	RegisteredBehaviorGroup(group string) []RegisteredBehavior
	UnregisterBehavior(group, name string) error
}

type RemoteSpawnOptions

type RemoteSpawnOptions struct {
	// RegisterName
	RegisterName string
	// Monitor enables monitor on the spawned process using provided reference
	Monitor etf.Ref
	// Link enables link between the calling and spawned processes
	Link bool
	// Function in order to support {M,F,A} request to the Erlang node
	Function string
	// Timeout
	Timeout int
}

RemoteSpawnOptions defines options for RemoteSpawn method

type RemoteSpawnRequest

type RemoteSpawnRequest struct {
	// Ref request id
	Ref etf.Ref
	// PID of the process made RemoteSpawn request
	From etf.Pid
	// Function provided via RemoteSpawnOptions.Function
	Function string
}

RemoteSpawnRequest stores in process environment ("ergo:RemoteSpawnRequest") if it was spawned by RemoteSpawn request

type Saga

type Saga struct {
	Server
}

func (*Saga) HandleCall

func (gs *Saga) HandleCall(process *ServerProcess, from ServerFrom, message etf.Term) (etf.Term, ServerStatus)

func (*Saga) HandleCast

func (gs *Saga) HandleCast(process *ServerProcess, message etf.Term) ServerStatus

func (*Saga) HandleDirect

func (gs *Saga) HandleDirect(process *ServerProcess, message interface{}) (interface{}, error)

func (*Saga) HandleInfo

func (gs *Saga) HandleInfo(process *ServerProcess, message etf.Term) ServerStatus

func (*Saga) HandleJobFailed

func (gs *Saga) HandleJobFailed(process *SagaProcess, id SagaTransactionID, from SagaJobID, reason string) SagaStatus

func (*Saga) HandleJobInterim

func (gs *Saga) HandleJobInterim(process *SagaProcess, id SagaTransactionID, from SagaJobID, interim interface{}) SagaStatus

func (*Saga) HandleJobResult

func (gs *Saga) HandleJobResult(process *SagaProcess, id SagaTransactionID, from SagaJobID, result interface{}) SagaStatus

func (*Saga) HandleSagaCall

func (gs *Saga) HandleSagaCall(process *SagaProcess, from ServerFrom, message etf.Term) (etf.Term, ServerStatus)

func (*Saga) HandleSagaCast

func (gs *Saga) HandleSagaCast(process *SagaProcess, message etf.Term) ServerStatus

func (*Saga) HandleSagaDirect

func (gs *Saga) HandleSagaDirect(process *SagaProcess, message interface{}) (interface{}, error)

func (*Saga) HandleSagaInfo

func (gs *Saga) HandleSagaInfo(process *SagaProcess, message etf.Term) ServerStatus

func (*Saga) HandleTxCommit

func (gs *Saga) HandleTxCommit(process *SagaProcess, id SagaTransactionID, final interface{}) SagaStatus

func (*Saga) HandleTxDone

func (gs *Saga) HandleTxDone(process *SagaProcess, id SagaTransactionID, result interface{}) (interface{}, SagaStatus)

func (*Saga) HandleTxInterim

func (gs *Saga) HandleTxInterim(process *SagaProcess, id SagaTransactionID, from SagaNextID, interim interface{}) SagaStatus

func (*Saga) Init

func (gs *Saga) Init(process *ServerProcess, args ...etf.Term) error

func (*Saga) SetMaxTransactions

func (gs *Saga) SetMaxTransactions(process Process, max uint) error

SetMaxTransactions set maximum transactions fo the saga

type SagaBehavior

type SagaBehavior interface {

	// InitSaga
	InitSaga(process *SagaProcess, args ...etf.Term) (SagaOptions, error)

	// HandleTxNew invokes on a new TX receiving by this saga.
	HandleTxNew(process *SagaProcess, id SagaTransactionID, value interface{}) SagaStatus

	// HandleTxResult invoked on a receiving result from the next saga
	HandleTxResult(process *SagaProcess, id SagaTransactionID, from SagaNextID, result interface{}) SagaStatus

	// HandleTxCancel invoked on a request of transaction cancelation.
	HandleTxCancel(process *SagaProcess, id SagaTransactionID, reason string) SagaStatus

	// HandleTxDone invoked when the transaction is done on a saga where it was created.
	// It returns the final result and SagaStatus. The commit message will deliver the final
	// result to all participants of this transaction (if it has enabled the TwoPhaseCommit option).
	// Otherwise the final result will be ignored.
	HandleTxDone(process *SagaProcess, id SagaTransactionID, result interface{}) (interface{}, SagaStatus)

	// HandleTxInterim invoked if received interim result from the next hop
	HandleTxInterim(process *SagaProcess, id SagaTransactionID, from SagaNextID, interim interface{}) SagaStatus

	// HandleTxCommit invoked if TwoPhaseCommit option is enabled for the given TX.
	// All sagas involved in this TX receive a commit message with final value and invoke this callback.
	// The final result has a value returned by HandleTxDone on a Saga created this TX.
	HandleTxCommit(process *SagaProcess, id SagaTransactionID, final interface{}) SagaStatus

	// HandleJobResult
	HandleJobResult(process *SagaProcess, id SagaTransactionID, from SagaJobID, result interface{}) SagaStatus
	// HandleJobInterim
	HandleJobInterim(process *SagaProcess, id SagaTransactionID, from SagaJobID, interim interface{}) SagaStatus
	// HandleJobFailed
	HandleJobFailed(process *SagaProcess, id SagaTransactionID, from SagaJobID, reason string) SagaStatus

	// HandleStageCall this callback is invoked on ServerProcess.Call. This method is optional
	// for the implementation
	HandleSagaCall(process *SagaProcess, from ServerFrom, message etf.Term) (etf.Term, ServerStatus)
	// HandleStageCast this callback is invoked on ServerProcess.Cast. This method is optional
	// for the implementation
	HandleSagaCast(process *SagaProcess, message etf.Term) ServerStatus
	// HandleStageInfo this callback is invoked on Process.Send. This method is optional
	// for the implementation
	HandleSagaInfo(process *SagaProcess, message etf.Term) ServerStatus
	// HandleSagaDirect this callback is invoked on Process.Direct. This method is optional
	// for the implementation
	HandleSagaDirect(process *SagaProcess, message interface{}) (interface{}, error)
}

SagaBehavior interface

type SagaJob

type SagaJob struct {
	ID            SagaJobID
	TransactionID SagaTransactionID
	Value         interface{}
	// contains filtered or unexported fields
}

type SagaJobID

type SagaJobID etf.Ref

func (SagaJobID) String

func (id SagaJobID) String() string

type SagaJobOptions

type SagaJobOptions struct {
	Timeout uint
}

type SagaNext

type SagaNext struct {
	// Saga etf.Pid, string (for the locally registered process), gen.ProcessID{process, node} (for the remote process)
	Saga interface{}
	// Value a value for the invoking HandleTxNew on a next hop.
	Value interface{}
	// Timeout how long this Saga will be waiting for the result from the next hop. Default - 10 seconds
	Timeout uint
	// TrapCancel if the next saga fails, it will transform the cancel signal into the regular message gen.MessageSagaCancel, and HandleSagaInfo callback will be invoked.
	TrapCancel bool
	// contains filtered or unexported fields
}

type SagaNextID

type SagaNextID etf.Ref

func (SagaNextID) String

func (id SagaNextID) String() string

type SagaOptions

type SagaOptions struct {
	// MaxTransactions defines the limit for the number of active transactions. Default: 0 (unlimited)
	MaxTransactions uint
	// Worker
	Worker SagaWorkerBehavior
}

type SagaProcess

type SagaProcess struct {
	ServerProcess
	// contains filtered or unexported fields
}

func (*SagaProcess) CancelJob

func (sp *SagaProcess) CancelJob(id SagaTransactionID, job SagaJobID, reason string) error

func (*SagaProcess) CancelTransaction

func (sp *SagaProcess) CancelTransaction(id SagaTransactionID, reason string) error

func (*SagaProcess) Next

func (sp *SagaProcess) Next(id SagaTransactionID, next SagaNext) (SagaNextID, error)

func (*SagaProcess) SendInterim

func (sp *SagaProcess) SendInterim(id SagaTransactionID, interim interface{}) error

func (*SagaProcess) SendResult

func (sp *SagaProcess) SendResult(id SagaTransactionID, result interface{}) error

func (*SagaProcess) StartJob

func (sp *SagaProcess) StartJob(id SagaTransactionID, options SagaJobOptions, value interface{}) (SagaJobID, error)

func (*SagaProcess) StartTransaction

func (sp *SagaProcess) StartTransaction(options SagaTransactionOptions, value interface{}) SagaTransactionID

type SagaStatus

type SagaStatus error

type SagaTransaction

type SagaTransaction struct {
	sync.Mutex
	// contains filtered or unexported fields
}

type SagaTransactionID

type SagaTransactionID etf.Ref

func (SagaTransactionID) String

func (id SagaTransactionID) String() string

type SagaTransactionOptions

type SagaTransactionOptions struct {
	// HopLimit defines a number of hop within the transaction. Default limit
	// is 0 (no limit).
	HopLimit uint
	// Lifespan defines a lifespan for the transaction in seconds. Default is 60.
	Lifespan uint

	// TwoPhaseCommit enables 2PC for the transaction. This option makes all
	// Sagas involved in this transaction invoke HandleCommit callback on them and
	// invoke HandleCommitJob callback on Worker processes once the transaction is finished.
	TwoPhaseCommit bool
}

type SagaWorker

type SagaWorker struct {
	Server
}

func (*SagaWorker) HandleCall

func (w *SagaWorker) HandleCall(process *ServerProcess, from ServerFrom, message etf.Term) (etf.Term, ServerStatus)

func (*SagaWorker) HandleCast

func (w *SagaWorker) HandleCast(process *ServerProcess, message etf.Term) ServerStatus

func (*SagaWorker) HandleDirect

func (w *SagaWorker) HandleDirect(process *ServerProcess, message interface{}) (interface{}, error)

func (*SagaWorker) HandleInfo

func (w *SagaWorker) HandleInfo(process *ServerProcess, message etf.Term) ServerStatus

func (*SagaWorker) HandleJobCommit

func (w *SagaWorker) HandleJobCommit(process *SagaWorkerProcess, final interface{})

default callbacks

func (*SagaWorker) HandleWorkerCall

func (w *SagaWorker) HandleWorkerCall(process *SagaWorkerProcess, from ServerFrom, message etf.Term) (etf.Term, ServerStatus)

func (*SagaWorker) HandleWorkerCast

func (w *SagaWorker) HandleWorkerCast(process *SagaWorkerProcess, message etf.Term) ServerStatus

func (*SagaWorker) HandleWorkerDirect

func (w *SagaWorker) HandleWorkerDirect(process *SagaWorkerProcess, message interface{}) (interface{}, error)

func (*SagaWorker) HandleWorkerInfo

func (w *SagaWorker) HandleWorkerInfo(process *SagaWorkerProcess, message etf.Term) ServerStatus

func (*SagaWorker) HandleWorkerTerminate

func (w *SagaWorker) HandleWorkerTerminate(process *SagaWorkerProcess, reason string)

func (*SagaWorker) Init

func (w *SagaWorker) Init(process *ServerProcess, args ...etf.Term) error

func (*SagaWorker) Terminate

func (w *SagaWorker) Terminate(process *ServerProcess, reason string)

type SagaWorkerBehavior

type SagaWorkerBehavior interface {
	ServerBehavior

	// HandleJobStart invoked on a worker start
	HandleJobStart(process *SagaWorkerProcess, job SagaJob) error
	// HandleJobCancel invoked if transaction was canceled before the termination.
	HandleJobCancel(process *SagaWorkerProcess, reason string)

	// HandleJobCommit invoked if this job was a part of the transaction
	// with enabled TwoPhaseCommit option. All workers involved in this TX
	// handling are receiving this call. Callback invoked before the termination.
	HandleJobCommit(process *SagaWorkerProcess, final interface{})

	// HandleWorkerInfo this callback is invoked on Process.Send. This method is optional
	// for the implementation
	HandleWorkerInfo(process *SagaWorkerProcess, message etf.Term) ServerStatus
	// HandleWorkerCast this callback is invoked on ServerProcess.Cast. This method is optional
	// for the implementation
	HandleWorkerCast(process *SagaWorkerProcess, message etf.Term) ServerStatus
	// HandleWorkerCall this callback is invoked on ServerProcess.Call. This method is optional
	// for the implementation
	HandleWorkerCall(process *SagaWorkerProcess, from ServerFrom, message etf.Term) (etf.Term, ServerStatus)
	// HandleWorkerDirect this callback is invoked on Process.Direct. This method is optional
	// for the implementation
	HandleWorkerDirect(process *SagaWorkerProcess, message interface{}) (interface{}, error)

	// HandleWorkerTerminate this callback invoked on a process termination
	HandleWorkerTerminate(process *SagaWorkerProcess, reason string)
}

type SagaWorkerProcess

type SagaWorkerProcess struct {
	ServerProcess
	// contains filtered or unexported fields
}

func (*SagaWorkerProcess) SendInterim

func (wp *SagaWorkerProcess) SendInterim(interim interface{}) error

SendInterim

func (*SagaWorkerProcess) SendResult

func (wp *SagaWorkerProcess) SendResult(result interface{}) error

SendResult sends the result and terminates this worker if 2PC is disabled. Otherwise, will be waiting for cancel/commit signal.

type Server

type Server struct{}

Server is implementation of ProcessBehavior interface for Server objects

func (*Server) HandleCall

func (gs *Server) HandleCall(process *ServerProcess, from ServerFrom, message etf.Term) (etf.Term, ServerStatus)

func (*Server) HandleCast

func (gs *Server) HandleCast(process *ServerProcess, message etf.Term) ServerStatus

func (*Server) HandleDirect

func (gs *Server) HandleDirect(process *ServerProcess, message interface{}) (interface{}, error)

func (*Server) HandleInfo

func (gs *Server) HandleInfo(process *ServerProcess, message etf.Term) ServerStatus

func (*Server) Init

func (gs *Server) Init(process *ServerProcess, args ...etf.Term) error

default callbacks for Server interface

func (*Server) ProcessInit

func (gs *Server) ProcessInit(p Process, args ...etf.Term) (ProcessState, error)

func (*Server) ProcessLoop

func (gs *Server) ProcessLoop(ps ProcessState, started chan<- bool) string

func (*Server) Terminate

func (gs *Server) Terminate(process *ServerProcess, reason string)

type ServerBehavior

type ServerBehavior interface {
	ProcessBehavior

	// Init invoked on a start Server
	Init(process *ServerProcess, args ...etf.Term) error

	// HandleCast invoked if Server received message sent with ServerProcess.Cast.
	// Return ServerStatusStop to stop server with "normal" reason. Use ServerStatus(error)
	// for the custom reason
	HandleCast(process *ServerProcess, message etf.Term) ServerStatus

	// HandleCall invoked if Server got sync request using ServerProcess.Call
	HandleCall(process *ServerProcess, from ServerFrom, message etf.Term) (etf.Term, ServerStatus)

	// HandleDirect invoked on a direct request made with Process.Direct
	HandleDirect(process *ServerProcess, message interface{}) (interface{}, error)

	// HandleInfo invoked if Server received message sent with Process.Send.
	HandleInfo(process *ServerProcess, message etf.Term) ServerStatus

	// Terminate invoked on a termination process. ServerProcess.State is not locked during
	// this callback.
	Terminate(process *ServerProcess, reason string)
}

ServerBehavior interface

type ServerFrom

type ServerFrom struct {
	Pid          etf.Pid
	Ref          etf.Ref
	ReplyByAlias bool
}

ServerFrom

type ServerProcess

type ServerProcess struct {
	ProcessState
	// contains filtered or unexported fields
}

ServerState state of the Server process.

func (*ServerProcess) Call

func (sp *ServerProcess) Call(to interface{}, message etf.Term) (etf.Term, error)

Call makes outgoing sync request in fashion of 'gen_server:call'. 'to' can be Pid, registered local name or gen.ProcessID{RegisteredName, NodeName}. This method shouldn't be used outside of the actor. Use Direct method instead.

func (*ServerProcess) CallRPC

func (sp *ServerProcess) CallRPC(node, module, function string, args ...etf.Term) (etf.Term, error)

CallRPC evaluate rpc call with given node/MFA

func (*ServerProcess) CallRPCWithTimeout

func (sp *ServerProcess) CallRPCWithTimeout(timeout int, node, module, function string, args ...etf.Term) (etf.Term, error)

CallRPCWithTimeout evaluate rpc call with given node/MFA and timeout

func (*ServerProcess) CallWithTimeout

func (sp *ServerProcess) CallWithTimeout(to interface{}, message etf.Term, timeout int) (etf.Term, error)

CallWithTimeout makes outgoing sync request in fashiod of 'gen_server:call' with given timeout. This method shouldn't be used outside of the actor. Use DirectWithTimeout method instead.

func (*ServerProcess) Cast

func (sp *ServerProcess) Cast(to interface{}, message etf.Term) error

Cast sends a message in fashion of 'gen_server:cast'. 'to' can be a Pid, registered local name or gen.ProcessID{RegisteredName, NodeName}

func (*ServerProcess) CastAfter

func (sp *ServerProcess) CastAfter(to interface{}, message etf.Term, after time.Duration) context.CancelFunc

CastAfter a simple wrapper for Process.SendAfter to send a message in fashion of 'gen_server:cast'

func (*ServerProcess) CastRPC

func (sp *ServerProcess) CastRPC(node, module, function string, args ...etf.Term) error

CastRPC evaluate rpc cast with given node/MFA

func (*ServerProcess) SendReply

func (sp *ServerProcess) SendReply(from ServerFrom, reply etf.Term) error

SendReply sends a reply message to the sender made ServerProcess.Call request. Useful for the case with dispatcher and pool of workers: Dispatcher process forwards Call requests (asynchronously) within a HandleCall callback to the worker(s) using ServerProcess.Cast or ServerProcess.Send but returns ServerStatusIgnore instead of ServerStatusOK; Worker process sends result using ServerProcess.SendReply method with 'from' value received from the Dispatcher.

type ServerStatus

type ServerStatus error
var (
	ServerStatusOK     ServerStatus = nil
	ServerStatusStop   ServerStatus = fmt.Errorf("stop")
	ServerStatusIgnore ServerStatus = fmt.Errorf("ignore")
)

func ServerStatusStopWithReason

func ServerStatusStopWithReason(s string) ServerStatus

type Stage

type Stage struct {
	Server
}

func (*Stage) DisableAutoDemand added in v1.999.200

func (s *Stage) DisableAutoDemand(p Process, subscription StageSubscription) error

DisableAutoDemand means that demand must be sent to producers explicitly using Ask method. This mode can be used when a special behavior is desired.

func (*Stage) DisableForwardDemand added in v1.999.200

func (s *Stage) DisableForwardDemand(p Process) error

DisableForwardDemand disables forwarding messages to the HandleDemand on a producer stage. This is useful as a synchronization mechanism, where the demand is accumulated until all consumers are subscribed.

func (*Stage) EnableAutoDemand added in v1.999.200

func (s *Stage) EnableAutoDemand(p Process, subscription StageSubscription) error

EnableAutoDemand enables auto demand mode (this is default mode for the consumer).

func (*Stage) EnableForwardDemand added in v1.999.200

func (s *Stage) EnableForwardDemand(p Process) error

EnableForwardDemand enables forwarding messages to the HandleDemand on a producer stage. This is default mode for the producer.

func (*Stage) HandleCall

func (gst *Stage) HandleCall(process *ServerProcess, from ServerFrom, message etf.Term) (etf.Term, ServerStatus)

func (*Stage) HandleCancel

func (gst *Stage) HandleCancel(process *StageProcess, subscription StageSubscription, reason string) StageStatus

func (*Stage) HandleCanceled

func (gst *Stage) HandleCanceled(process *StageProcess, subscription StageSubscription, reason string) StageStatus

func (*Stage) HandleCast

func (gst *Stage) HandleCast(process *ServerProcess, message etf.Term) ServerStatus

func (*Stage) HandleDemand

func (gst *Stage) HandleDemand(process *StageProcess, subscription StageSubscription, count uint) (etf.List, StageStatus)

func (*Stage) HandleDirect

func (gst *Stage) HandleDirect(process *ServerProcess, message interface{}) (interface{}, error)

func (*Stage) HandleEvents

func (gst *Stage) HandleEvents(process *StageProcess, subscription StageSubscription, events etf.List) StageStatus

func (*Stage) HandleInfo

func (gst *Stage) HandleInfo(process *ServerProcess, message etf.Term) ServerStatus

func (*Stage) HandleStageCall

func (gst *Stage) HandleStageCall(process *StageProcess, from ServerFrom, message etf.Term) (etf.Term, ServerStatus)

func (*Stage) HandleStageCast

func (gst *Stage) HandleStageCast(process *StageProcess, message etf.Term) ServerStatus

func (*Stage) HandleStageDirect

func (gst *Stage) HandleStageDirect(process *StageProcess, message interface{}) (interface{}, error)

func (*Stage) HandleStageInfo

func (gst *Stage) HandleStageInfo(process *StageProcess, message etf.Term) ServerStatus

func (*Stage) HandleSubscribe

func (gst *Stage) HandleSubscribe(process *StageProcess, subscription StageSubscription, options StageSubscribeOptions) StageStatus

func (*Stage) HandleSubscribed

func (gst *Stage) HandleSubscribed(process *StageProcess, subscription StageSubscription, opts StageSubscribeOptions) (bool, StageStatus)

func (*Stage) Init

func (gst *Stage) Init(process *ServerProcess, args ...etf.Term) error

gen.Server callbacks

func (*Stage) InitStage

func (gst *Stage) InitStage(process *StageProcess, args ...etf.Term) error

func (*Stage) SetCancelMode

func (s *Stage) SetCancelMode(p Process, subscription StageSubscription, cancel StageCancelMode) error

SetCancelMode defines how consumer will handle termination of the producer. There are 3 modes: StageCancelPermanent (default) - consumer exits when the producer cancels or exits StageCancelTransient - consumer exits only if reason is not normal, shutdown, or {shutdown, reason} StageCancelTemporary - never exits

type StageBehavior

type StageBehavior interface {

	// InitStage
	InitStage(process *StageProcess, args ...etf.Term) (StageOptions, error)

	// HandleDemand this callback is invoked on a producer stage
	// The producer that implements this callback must either store the demand, or return the amount of requested events.
	HandleDemand(process *StageProcess, subscription StageSubscription, count uint) (etf.List, StageStatus)

	// HandleEvents this callback is invoked on a consumer stage.
	HandleEvents(process *StageProcess, subscription StageSubscription, events etf.List) StageStatus

	// HandleSubscribe This callback is invoked on a producer stage.
	HandleSubscribe(process *StageProcess, subscription StageSubscription, options StageSubscribeOptions) StageStatus

	// HandleSubscribed this callback is invoked as a confirmation for the subscription request
	// Returning false means that demand must be sent to producers explicitly using Ask method.
	// Returning true means the stage implementation will take care of automatically sending.
	HandleSubscribed(process *StageProcess, subscription StageSubscription, opts StageSubscribeOptions) (bool, StageStatus)

	// HandleCancel
	// Invoked when a consumer is no longer subscribed to a producer (invoked on a producer stage)
	// The cancelReason will be a {Cancel: "cancel", Reason: _} if the reason for cancellation
	// was a Stage.Cancel call. Any other value means the cancellation reason was
	// due to an EXIT.
	HandleCancel(process *StageProcess, subscription StageSubscription, reason string) StageStatus

	// HandleCanceled
	// Invoked when a consumer is no longer subscribed to a producer (invoked on a consumer stage)
	// Termination this stage depends on a cancel mode for the given subscription. For the cancel mode
	// StageCancelPermanent - this stage will be terminated right after this callback invoking.
	// For the cancel mode StageCancelTransient - it depends on a reason of subscription canceling.
	// Cancel mode StageCancelTemporary keeps this stage alive whether the reason could be.
	HandleCanceled(process *StageProcess, subscription StageSubscription, reason string) StageStatus

	// HandleStageCall this callback is invoked on ServerProcess.Call. This method is optional
	// for the implementation
	HandleStageCall(process *StageProcess, from ServerFrom, message etf.Term) (etf.Term, ServerStatus)
	// HandleStageDirect this callback is invoked on Process.Direct. This method is optional
	// for the implementation
	HandleStageDirect(process *StageProcess, message interface{}) (interface{}, error)
	// HandleStageCast this callback is invoked on ServerProcess.Cast. This method is optional
	// for the implementation
	HandleStageCast(process *StageProcess, message etf.Term) ServerStatus
	// HandleStageInfo this callback is invoked on Process.Send. This method is optional
	// for the implementation
	HandleStageInfo(process *StageProcess, message etf.Term) ServerStatus
}

StageBehavior interface for the Stage inmplementation

type StageCancelMode

type StageCancelMode uint
const (
	StageCancelPermanent StageCancelMode = 0
	StageCancelTransient StageCancelMode = 1
	StageCancelTemporary StageCancelMode = 2
)

type StageCancelReason

type StageCancelReason struct {
	Cancel string
	Reason string
}

type StageDispatchItem

type StageDispatchItem struct {
	// contains filtered or unexported fields
}

type StageDispatcher

type StageDispatcher int

type StageDispatcherBehavior

type StageDispatcherBehavior interface {
	// InitStageDispatcher(opts)
	Init(opts StageOptions) (state interface{})

	// Ask called every time a consumer sends demand
	Ask(state interface{}, subscription StageSubscription, count uint)

	// Cancel called every time a subscription is cancelled or the consumer goes down.
	Cancel(state interface{}, subscription StageSubscription)

	// Dispatch called every time a producer wants to dispatch an event.
	Dispatch(state interface{}, events etf.List) []StageDispatchItem

	// Subscribe called every time the producer gets a new subscriber
	Subscribe(state interface{}, subscription StageSubscription, opts StageSubscribeOptions) error
}

StageDispatcherBehavior defined interface for the dispatcher implementation. To create a custom dispatcher you should implement this interface and use it in StageOptions as a Dispatcher

func CreateStageDispatcherBroadcast

func CreateStageDispatcherBroadcast() StageDispatcherBehavior

CreateStageDispatcherBroadcast creates a dispatcher that accumulates demand from all consumers before broadcasting events to all of them. This dispatcher guarantees that events are dispatched to all consumers without exceeding the demand of any given consumer. The demand is only sent upstream once all consumers ask for data.

func CreateStageDispatcherDemand

func CreateStageDispatcherDemand() StageDispatcherBehavior

CreateStageDispatcherDemand creates a dispatcher that sends batches to the highest demand. This is the default dispatcher used by Stage. In order to avoid greedy consumers, it is recommended that all consumers have exactly the same maximum demand.

func CreateStageDispatcherPartition

func CreateStageDispatcherPartition(n uint, hash func(etf.Term) int) StageDispatcherBehavior

CreateStageDispatcherPartition creates a dispatcher that sends events according to partitions. Number of partitions 'n' must be > 0. 'hash' should return number within range [0,n). Value outside of this range is discarding event. If 'hash' is nil the random partition will be used on every event.

type StageOptions

type StageOptions struct {

	// DisableForwarding. the demand is always forwarded to the HandleDemand callback.
	// When this options is set to 'true', demands are accumulated until mode is
	// set back to 'false' using DisableDemandAccumulating method
	DisableForwarding bool

	// BufferSize the size of the buffer to store events without demand.
	// default value = defaultDispatcherBufferSize
	BufferSize uint

	// BufferKeepLast defines whether the first or last entries should be
	// kept on the buffer in case the buffer size is exceeded.
	BufferKeepLast bool

	Dispatcher StageDispatcherBehavior
}

StageOptions defines the producer configuration using Init callback. It will be ignored if it acts as a consumer only.

type StageProcess

type StageProcess struct {
	ServerProcess
	// contains filtered or unexported fields
}

func (*StageProcess) Ask

func (p *StageProcess) Ask(subscription StageSubscription, count uint) error

Ask makes a demand request for the given subscription. This function must only be used in the cases when a consumer sets a subscription to manual mode using DisableAutoDemand

func (*StageProcess) Cancel

func (p *StageProcess) Cancel(subscription StageSubscription, reason string) error

Cancel

func (*StageProcess) SendEvents

func (p *StageProcess) SendEvents(events etf.List) error

SendEvents sends events to the subscribers

func (*StageProcess) Subscribe

func (p *StageProcess) Subscribe(producer etf.Term, opts StageSubscribeOptions) (StageSubscription, error)

Subscribe subscribes to the given producer. HandleSubscribed callback will be invoked on a consumer stage once a request for the subscription is sent. If something went wrong on a producer side the callback HandleCancel will be invoked with a reason of cancelation.

type StageStatus

type StageStatus error
var (
	StageStatusOK           StageStatus = nil
	StageStatusStop         StageStatus = fmt.Errorf("stop")
	StageStatusUnsupported  StageStatus = fmt.Errorf("unsupported")
	StageStatusNotAProducer StageStatus = fmt.Errorf("not a producer")
)

type StageSubscribeOptions

type StageSubscribeOptions struct {
	MinDemand uint `etf:"min_demand"`
	MaxDemand uint `etf:"max_demand"`
	// The stage implementation will take care of automatically sending
	// demand to producer (as a default behavior). You can disable it
	// setting ManualDemand to true
	ManualDemand bool `etf:"manual"`
	// What should happened with consumer if producer has terminated
	// StageCancelPermanent the consumer exits when the producer cancels or exits.
	// StageCancelTransient the consumer exits only if reason is not "normal",
	// "shutdown", or {"shutdown", _}
	// StageCancelTemporary the consumer never exits
	Cancel StageCancelMode `etf:"cancel"`

	// Partition is defined the number of partition this subscription should belongs to.
	// This option uses in the DispatcherPartition
	Partition uint `etf:"partition"`

	// Extra is intended to be a custom set of options for the custom implementation
	// of StageDispatcherBehavior
	Extra etf.Term `etf:"extra"`
}

type StageSubscription

type StageSubscription struct {
	Pid etf.Pid
	ID  etf.Ref
}

type Supervisor

type Supervisor struct{}

Supervisor is implementation of ProcessBehavior interface

func (*Supervisor) ProcessInit

func (sv *Supervisor) ProcessInit(p Process, args ...etf.Term) (ProcessState, error)

func (*Supervisor) ProcessLoop

func (sv *Supervisor) ProcessLoop(ps ProcessState, started chan<- bool) string

func (*Supervisor) StartChild

func (sv *Supervisor) StartChild(supervisor Process, name string, args ...etf.Term) (Process, error)

StartChild dynamically starts a child process with given name of child spec which is defined by Init call.

type SupervisorBehavior

type SupervisorBehavior interface {
	ProcessBehavior
	Init(args ...etf.Term) (SupervisorSpec, error)
}

SupervisorBehavior interface

type SupervisorChildSpec

type SupervisorChildSpec struct {
	// Node to run child on remote node
	Node  string
	Name  string
	Child ProcessBehavior
	Args  []etf.Term
	// contains filtered or unexported fields
}

type SupervisorSpec

type SupervisorSpec struct {
	Name     string
	Children []SupervisorChildSpec
	Strategy SupervisorStrategy
	// contains filtered or unexported fields
}

type SupervisorStrategy

type SupervisorStrategy struct {
	Type      SupervisorStrategyType
	Intensity uint16
	Period    uint16
	Restart   SupervisorStrategyRestart
}

type SupervisorStrategyRestart

type SupervisorStrategyRestart = string

type SupervisorStrategyType

type SupervisorStrategyType = string

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL