Documentation ¶
Index ¶
- Constants
- Variables
- type Application
- type ApplicationBehavior
- type ApplicationChildSpec
- type ApplicationInfo
- type ApplicationSpec
- type ApplicationStartType
- type MessageDirectChildren
- type MessageDown
- type MessageExit
- type MessageManageRPC
- type MessageNodeDown
- type MessageSagaCancel
- type MessageSagaError
- type Monitor
- type Process
- type ProcessBehavior
- type ProcessChannels
- type ProcessDirectMessage
- type ProcessGracefulExitRequest
- type ProcessID
- type ProcessInfo
- type ProcessMailboxMessage
- type ProcessOptions
- type ProcessState
- type RPC
- type RegisteredBehavior
- type Registrar
- type RemoteSpawnOptions
- type RemoteSpawnRequest
- type Saga
- func (gs *Saga) HandleCall(process *ServerProcess, from ServerFrom, message etf.Term) (etf.Term, ServerStatus)
- func (gs *Saga) HandleCast(process *ServerProcess, message etf.Term) ServerStatus
- func (gs *Saga) HandleDirect(process *ServerProcess, message interface{}) (interface{}, error)
- func (gs *Saga) HandleInfo(process *ServerProcess, message etf.Term) ServerStatus
- func (gs *Saga) HandleJobFailed(process *SagaProcess, id SagaTransactionID, from SagaJobID, reason string) SagaStatus
- func (gs *Saga) HandleJobInterim(process *SagaProcess, id SagaTransactionID, from SagaJobID, ...) SagaStatus
- func (gs *Saga) HandleJobResult(process *SagaProcess, id SagaTransactionID, from SagaJobID, result interface{}) SagaStatus
- func (gs *Saga) HandleSagaCall(process *SagaProcess, from ServerFrom, message etf.Term) (etf.Term, ServerStatus)
- func (gs *Saga) HandleSagaCast(process *SagaProcess, message etf.Term) ServerStatus
- func (gs *Saga) HandleSagaDirect(process *SagaProcess, message interface{}) (interface{}, error)
- func (gs *Saga) HandleSagaInfo(process *SagaProcess, message etf.Term) ServerStatus
- func (gs *Saga) HandleTxCommit(process *SagaProcess, id SagaTransactionID, final interface{}) SagaStatus
- func (gs *Saga) HandleTxDone(process *SagaProcess, id SagaTransactionID, result interface{}) (interface{}, SagaStatus)
- func (gs *Saga) HandleTxInterim(process *SagaProcess, id SagaTransactionID, from SagaNextID, ...) SagaStatus
- func (gs *Saga) Init(process *ServerProcess, args ...etf.Term) error
- func (gs *Saga) SetMaxTransactions(process Process, max uint) error
- type SagaBehavior
- type SagaJob
- type SagaJobID
- type SagaJobOptions
- type SagaNext
- type SagaNextID
- type SagaOptions
- type SagaProcess
- func (sp *SagaProcess) CancelJob(id SagaTransactionID, job SagaJobID, reason string) error
- func (sp *SagaProcess) CancelTransaction(id SagaTransactionID, reason string) error
- func (sp *SagaProcess) Next(id SagaTransactionID, next SagaNext) (SagaNextID, error)
- func (sp *SagaProcess) SendInterim(id SagaTransactionID, interim interface{}) error
- func (sp *SagaProcess) SendResult(id SagaTransactionID, result interface{}) error
- func (sp *SagaProcess) StartJob(id SagaTransactionID, options SagaJobOptions, value interface{}) (SagaJobID, error)
- func (sp *SagaProcess) StartTransaction(options SagaTransactionOptions, value interface{}) SagaTransactionID
- type SagaStatus
- type SagaTransaction
- type SagaTransactionID
- type SagaTransactionOptions
- type SagaWorker
- func (w *SagaWorker) HandleCall(process *ServerProcess, from ServerFrom, message etf.Term) (etf.Term, ServerStatus)
- func (w *SagaWorker) HandleCast(process *ServerProcess, message etf.Term) ServerStatus
- func (w *SagaWorker) HandleDirect(process *ServerProcess, message interface{}) (interface{}, error)
- func (w *SagaWorker) HandleInfo(process *ServerProcess, message etf.Term) ServerStatus
- func (w *SagaWorker) HandleJobCommit(process *SagaWorkerProcess, final interface{})
- func (w *SagaWorker) HandleWorkerCall(process *SagaWorkerProcess, from ServerFrom, message etf.Term) (etf.Term, ServerStatus)
- func (w *SagaWorker) HandleWorkerCast(process *SagaWorkerProcess, message etf.Term) ServerStatus
- func (w *SagaWorker) HandleWorkerDirect(process *SagaWorkerProcess, message interface{}) (interface{}, error)
- func (w *SagaWorker) HandleWorkerInfo(process *SagaWorkerProcess, message etf.Term) ServerStatus
- func (w *SagaWorker) HandleWorkerTerminate(process *SagaWorkerProcess, reason string)
- func (w *SagaWorker) Init(process *ServerProcess, args ...etf.Term) error
- func (w *SagaWorker) Terminate(process *ServerProcess, reason string)
- type SagaWorkerBehavior
- type SagaWorkerProcess
- type Server
- func (gs *Server) HandleCall(process *ServerProcess, from ServerFrom, message etf.Term) (etf.Term, ServerStatus)
- func (gs *Server) HandleCast(process *ServerProcess, message etf.Term) ServerStatus
- func (gs *Server) HandleDirect(process *ServerProcess, message interface{}) (interface{}, error)
- func (gs *Server) HandleInfo(process *ServerProcess, message etf.Term) ServerStatus
- func (gs *Server) Init(process *ServerProcess, args ...etf.Term) error
- func (gs *Server) ProcessInit(p Process, args ...etf.Term) (ProcessState, error)
- func (gs *Server) ProcessLoop(ps ProcessState, started chan<- bool) string
- func (gs *Server) Terminate(process *ServerProcess, reason string)
- type ServerBehavior
- type ServerFrom
- type ServerProcess
- func (sp *ServerProcess) Call(to interface{}, message etf.Term) (etf.Term, error)
- func (sp *ServerProcess) CallRPC(node, module, function string, args ...etf.Term) (etf.Term, error)
- func (sp *ServerProcess) CallRPCWithTimeout(timeout int, node, module, function string, args ...etf.Term) (etf.Term, error)
- func (sp *ServerProcess) CallWithTimeout(to interface{}, message etf.Term, timeout int) (etf.Term, error)
- func (sp *ServerProcess) Cast(to interface{}, message etf.Term) error
- func (sp *ServerProcess) CastAfter(to interface{}, message etf.Term, after time.Duration) context.CancelFunc
- func (sp *ServerProcess) CastRPC(node, module, function string, args ...etf.Term) error
- func (sp *ServerProcess) SendReply(from ServerFrom, reply etf.Term) error
- type ServerStatus
- type Stage
- func (s *Stage) DisableAutoDemand(p Process, subscription StageSubscription) error
- func (s *Stage) DisableForwardDemand(p Process) error
- func (s *Stage) EnableAutoDemand(p Process, subscription StageSubscription) error
- func (s *Stage) EnableForwardDemand(p Process) error
- func (gst *Stage) HandleCall(process *ServerProcess, from ServerFrom, message etf.Term) (etf.Term, ServerStatus)
- func (gst *Stage) HandleCancel(process *StageProcess, subscription StageSubscription, reason string) StageStatus
- func (gst *Stage) HandleCanceled(process *StageProcess, subscription StageSubscription, reason string) StageStatus
- func (gst *Stage) HandleCast(process *ServerProcess, message etf.Term) ServerStatus
- func (gst *Stage) HandleDemand(process *StageProcess, subscription StageSubscription, count uint) (etf.List, StageStatus)
- func (gst *Stage) HandleDirect(process *ServerProcess, message interface{}) (interface{}, error)
- func (gst *Stage) HandleEvents(process *StageProcess, subscription StageSubscription, events etf.List) StageStatus
- func (gst *Stage) HandleInfo(process *ServerProcess, message etf.Term) ServerStatus
- func (gst *Stage) HandleStageCall(process *StageProcess, from ServerFrom, message etf.Term) (etf.Term, ServerStatus)
- func (gst *Stage) HandleStageCast(process *StageProcess, message etf.Term) ServerStatus
- func (gst *Stage) HandleStageDirect(process *StageProcess, message interface{}) (interface{}, error)
- func (gst *Stage) HandleStageInfo(process *StageProcess, message etf.Term) ServerStatus
- func (gst *Stage) HandleSubscribe(process *StageProcess, subscription StageSubscription, ...) StageStatus
- func (gst *Stage) HandleSubscribed(process *StageProcess, subscription StageSubscription, ...) (bool, StageStatus)
- func (gst *Stage) Init(process *ServerProcess, args ...etf.Term) error
- func (gst *Stage) InitStage(process *StageProcess, args ...etf.Term) error
- func (s *Stage) SetCancelMode(p Process, subscription StageSubscription, cancel StageCancelMode) error
- type StageBehavior
- type StageCancelMode
- type StageCancelReason
- type StageDispatchItem
- type StageDispatcher
- type StageDispatcherBehavior
- type StageOptions
- type StageProcess
- func (p *StageProcess) Ask(subscription StageSubscription, count uint) error
- func (p *StageProcess) Cancel(subscription StageSubscription, reason string) error
- func (p *StageProcess) SendEvents(events etf.List) error
- func (p *StageProcess) Subscribe(producer etf.Term, opts StageSubscribeOptions) (StageSubscription, error)
- type StageStatus
- type StageSubscribeOptions
- type StageSubscription
- type Supervisor
- type SupervisorBehavior
- type SupervisorChildSpec
- type SupervisorSpec
- type SupervisorStrategy
- type SupervisorStrategyRestart
- type SupervisorStrategyType
Constants ¶
const ( // ApplicationStartPermanent If a permanent application terminates, // all other applications and the runtime system (node) are also terminated. ApplicationStartPermanent = "permanent" // ApplicationStartTemporary If a temporary application terminates, // this is reported but no other applications are terminated. ApplicationStartTemporary = "temporary" // ApplicationStartTransient If a transient application terminates // with reason normal, this is reported but no other applications are // terminated. If a transient application terminates abnormally, that // is with any other reason than normal, all other applications and // the runtime system (node) are also terminated. ApplicationStartTransient = "transient" )
const ( // SupervisorRestartIntensity SupervisorRestartIntensity = uint16(10) // SupervisorRestartPeriod SupervisorRestartPeriod = uint16(10) // SupervisorStrategyOneForOne If one child process terminates and is to be restarted, only // that child process is affected. This is the default restart strategy. SupervisorStrategyOneForOne = SupervisorStrategyType("one_for_one") // SupervisorStrategyOneForAll If one child process terminates and is to be restarted, all other // child processes are terminated and then all child processes are restarted. SupervisorStrategyOneForAll = SupervisorStrategyType("one_for_all") // SupervisorStrategyRestForOne If one child process terminates and is to be restarted, // the 'rest' of the child processes (that is, the child // processes after the terminated child process in the start order) // are terminated. Then the terminated child process and all // child processes after it are restarted SupervisorStrategyRestForOne = SupervisorStrategyType("rest_for_one") // SupervisorStrategySimpleOneForOne A simplified one_for_one supervisor, where all // child processes are dynamically added instances // of the same process type, that is, running the same code. SupervisorStrategySimpleOneForOne = SupervisorStrategyType("simple_one_for_one") // SupervisorStrategyRestartPermanent child process is always restarted SupervisorStrategyRestartPermanent = SupervisorStrategyRestart("permanent") // SupervisorStrategyRestartTemporary child process is never restarted // (not even when the supervisor restart strategy is rest_for_one // or one_for_all and a sibling death causes the temporary process // to be terminated) SupervisorStrategyRestartTemporary = SupervisorStrategyRestart("temporary") // SupervisorStrategyRestartTransient child process is restarted only if // it terminates abnormally, that is, with an exit reason other // than normal, shutdown. SupervisorStrategyRestartTransient = SupervisorStrategyRestart("transient") )
const (
DefaultCallTimeout = 5
)
Variables ¶
var ( SagaStatusOK SagaStatus // nil SagaStatusStop SagaStatus = fmt.Errorf("stop") ErrSagaTxEndOfLifespan = fmt.Errorf("End of TX lifespan") ErrSagaTxNextTimeout = fmt.Errorf("Next saga timeout") ErrSagaUnknown = fmt.Errorf("Unknown saga") ErrSagaJobUnknown = fmt.Errorf("Unknown job") ErrSagaTxUnknown = fmt.Errorf("Unknown TX") ErrSagaTxCanceled = fmt.Errorf("Tx is canceled") ErrSagaTxInProgress = fmt.Errorf("Tx is still in progress") ErrSagaResultAlreadySent = fmt.Errorf("Result is already sent") ErrSagaNotAllowed = fmt.Errorf("Operation is not allowed") )
var ( ErrUnsupportedRequest = fmt.Errorf("Unsupported request") ErrServerTerminated = fmt.Errorf("Server terminated") )
Functions ¶
This section is empty.
Types ¶
type Application ¶
type Application struct{}
Application is implementation of ProcessBehavior interface
func (*Application) ProcessInit ¶
func (a *Application) ProcessInit(p Process, args ...etf.Term) (ProcessState, error)
func (*Application) ProcessLoop ¶
func (a *Application) ProcessLoop(ps ProcessState, started chan<- bool) string
type ApplicationBehavior ¶
type ApplicationBehavior interface { ProcessBehavior Load(args ...etf.Term) (ApplicationSpec, error) Start(process Process, args ...etf.Term) }
ApplicationBehavior interface
type ApplicationChildSpec ¶
type ApplicationChildSpec struct { Child ProcessBehavior Name string Args []etf.Term // contains filtered or unexported fields }
type ApplicationInfo ¶
type ApplicationSpec ¶
type ApplicationStartType ¶
type ApplicationStartType = string
type MessageDirectChildren ¶
type MessageDirectChildren struct{}
type MessageDown ¶
type MessageDown struct { Ref etf.Ref // a monitor reference ProcessID ProcessID // if monitor was created by name Pid etf.Pid Reason string }
MessageDown delivers as a message to Server's HandleInfo callback of the process that created monitor using MonitorProcess. Reason values:
- the exit reason of the process
- 'noproc' (process did not exist at the time of monitor creation)
- 'noconnection' (no connection to the node where the monitored process resides)
func IsMessageDown ¶
func IsMessageDown(message etf.Term) (MessageDown, bool)
type MessageExit ¶
MessageExit delievers to Server's HandleInfo callback on enabled trap exit using SetTrapExit(true)
func IsMessageExit ¶
func IsMessageExit(message etf.Term) (MessageExit, bool)
type MessageManageRPC ¶
MessageManageRPC is using to manage RPC feature provides by "rex" process
type MessageNodeDown ¶
type MessageNodeDown struct {
Name string
}
MessageNodeDown delivers as a message to Server's HandleInfo callback of the process that created monitor using MonitorNode
type MessageSagaCancel ¶
type MessageSagaCancel struct { TransactionID SagaTransactionID NextID SagaNextID Reason string }
type MessageSagaError ¶
type MessageSagaError struct { TransactionID SagaTransactionID NextID SagaNextID Error string Details string }
type Process ¶
type Process interface { Registrar // Spawn create a new process with parent Spawn(name string, opts ProcessOptions, object ProcessBehavior, args ...etf.Term) (Process, error) // RemoteSpawn creates a new process at a remote node. The object name is a regitered behavior on a remote name using RegisterBehavior(...). Init callback of the started remote process will receive gen.RemoteSpawnRequest as an argument. RemoteSpawn(node string, object string, opts RemoteSpawnOptions, args ...etf.Term) (etf.Pid, error) // Name returns process name used on starting. Name() string // RegisterName register associates the name with pid (not overrides registered name on starting) RegisterName(name string) error // UnregisterName unregister named process. Unregistering name is allowed to the owner only UnregisterName(name string) error // Info returns process details Info() ProcessInfo // Self returns registered process identificator belongs to the process Self() etf.Pid // Direct make a direct request to the actor (gen.Application, gen.Supervisor, gen.Server or // inherited from gen.Server actor) with default timeout 5 seconds Direct(request interface{}) (interface{}, error) // DirectWithTimeout make a direct request to the actor with the given timeout (in seconds) DirectWithTimeout(request interface{}, timeout int) (interface{}, error) // Send sends a message in fashion of 'erlang:send'. The value of 'to' can be a Pid, registered local name // or gen.ProcessID{RegisteredName, NodeName} Send(to interface{}, message etf.Term) error // SendAfter starts a timer. When the timer expires, the message sends to the process // identified by 'to'. 'to' can be a Pid, registered local name or // gen.ProcessID{RegisteredName, NodeName}. Returns cancel function in order to discard // sending a message SendAfter(to interface{}, message etf.Term, after time.Duration) context.CancelFunc // Exit initiate a graceful stopping process Exit(reason string) error // Kill immidiately stops process Kill() // CreateAlias creates a new alias for the Process CreateAlias() (etf.Alias, error) // DeleteAlias deletes the given alias DeleteAlias(alias etf.Alias) error // ListEnv returns a map of configured environment variables. // It also includes environment variables from the GroupLeader and Parent. // which are overlapped by priority: Process(Parent(GroupLeader)) ListEnv() map[string]interface{} // SetEnv set environment variable with given name. Use nil value to remove variable with given name. SetEnv(name string, value interface{}) // Env returns value associated with given environment name. Env(name string) interface{} // Wait waits until process stopped Wait() // WaitWithTimeout waits until process stopped. Return ErrTimeout // if given timeout is exceeded WaitWithTimeout(d time.Duration) error // Link creates a link between the calling process and another process. // Links are bidirectional and there can only be one link between two processes. // Repeated calls to Process.Link(Pid) have no effect. If one of the participants // of a link terminates, it will send an exit signal to the other participant and caused // termination of the last one (if this process hasn't set a trap using Process.SetTrapExit(true)). Link(with etf.Pid) // Unlink removes the link, if there is one, between the calling process and // the process referred to by Pid. Unlink(with etf.Pid) // IsAlive returns whether the process is alive IsAlive() bool // SetTrapExit enables/disables the trap on terminate process. When a process is trapping exits, // it will not terminate when an exit signal is received. Instead, the signal is transformed // into a 'gen.MessageExit' which is put into the mailbox of the process just like a regular message. SetTrapExit(trap bool) // TrapExit returns whether the trap was enabled on this process TrapExit() bool // MonitorNode creates monitor between the current process and node. If Node fails or does not exist, // the message {nodedown, Node} is delivered to the process. MonitorNode(name string) etf.Ref // DemonitorNode removes monitor. Returns false if the given reference wasn't found DemonitorNode(ref etf.Ref) bool // MonitorProcess creates monitor between the processes. // Allowed types for the 'process' value: etf.Pid, gen.ProcessID // When a process monitor is triggered, a MessageDown sends to the caller. // Note: The monitor request is an asynchronous signal. That is, it takes // time before the signal reaches its destination. MonitorProcess(process interface{}) etf.Ref // DemonitorProcess removes monitor. Returns false if the given reference wasn't found DemonitorProcess(ref etf.Ref) bool // Behavior returns the object this process runs on. Behavior() ProcessBehavior // GroupLeader returns group leader process. Usually it points to the application process. GroupLeader() Process // Parent returns parent process. It returns nil if this process was spawned using Node.Spawn. Parent() Process // Context returns process context. Context() context.Context // Children returns list of children pid (Application, Supervisor) Children() ([]etf.Pid, error) // Links returns list of the process pids this process has linked to. Links() []etf.Pid // Monitors returns list of monitors created this process by pid. Monitors() []etf.Pid // Monitors returns list of monitors created this process by name. MonitorsByName() []ProcessID // MonitoredBy returns list of process pids monitored this process. MonitoredBy() []etf.Pid // Aliases returns list of aliases of this process. Aliases() []etf.Alias SendSyncRequestRaw(ref etf.Ref, node etf.Atom, messages ...etf.Term) error PutSyncReply(ref etf.Ref, term etf.Term) error SendSyncRequest(ref etf.Ref, to interface{}, message etf.Term) error WaitSyncReply(ref etf.Ref, timeout int) (etf.Term, error) ProcessChannels() ProcessChannels }
type ProcessBehavior ¶
type ProcessBehavior interface { ProcessInit(Process, ...etf.Term) (ProcessState, error) ProcessLoop(ProcessState, chan<- bool) string // method which implements control flow of process }
ProcessBehavior interface contains methods you should implement to make own process behaviour
type ProcessChannels ¶
type ProcessChannels struct { Mailbox <-chan ProcessMailboxMessage Direct <-chan ProcessDirectMessage GracefulExit <-chan ProcessGracefulExitRequest }
type ProcessDirectMessage ¶
type ProcessDirectMessage struct { Message interface{} Err error Reply chan ProcessDirectMessage }
type ProcessInfo ¶
type ProcessInfo struct { PID etf.Pid Name string CurrentFunction string Status string MessageQueueLen int Links []etf.Pid Monitors []etf.Pid MonitorsByName []ProcessID MonitoredBy []etf.Pid Aliases []etf.Alias Dictionary etf.Map TrapExit bool GroupLeader etf.Pid Reductions uint64 }
ProcessInfo struct with process details
type ProcessMailboxMessage ¶
type ProcessOptions ¶
type ProcessOptions struct { // Context allows mix the system context with the custom one. E.g. to limit // the lifespan using context.WithTimeout Context context.Context // MailboxSize defines the lenght of message queue for the process MailboxSize uint16 // GroupLeader GroupLeader Process // Env set the process environment variables Env map[string]interface{} }
type ProcessState ¶
type ProcessState struct { Process State interface{} }
type RegisteredBehavior ¶
type RegisteredBehavior struct { Behavior ProcessBehavior Data interface{} }
type Registrar ¶ added in v1.999.200
type Registrar interface { Monitor NodeName() string NodeStop() // ProcessByName returns Process struct for the given name. // Returns nil if it doesn't exist (not found) ProcessByName(name string) Process // ProcessByPid returns Process struct for the given Pid. // Returns nil if it doesn't exist (not found) ProcessByPid(pid etf.Pid) Process // ProcessByAlias returns Process struct for the given alias. // Returns nil if it doesn't exist (not found) ProcessByAlias(alias etf.Alias) Process // ProcessInfo returns the details about given Pid ProcessInfo(pid etf.Pid) (ProcessInfo, error) ProcessList() []Process IsAlias(etf.Alias) bool MakeRef() etf.Ref // IsProcessAlive returns true if the process with given pid is alive IsProcessAlive(process Process) bool RegisterBehavior(group, name string, behavior ProcessBehavior, data interface{}) error RegisteredBehavior(group, name string) (RegisteredBehavior, error) RegisteredBehaviorGroup(group string) []RegisteredBehavior UnregisterBehavior(group, name string) error }
type RemoteSpawnOptions ¶
type RemoteSpawnOptions struct { // RegisterName RegisterName string // Monitor enables monitor on the spawned process using provided reference Monitor etf.Ref // Link enables link between the calling and spawned processes Link bool // Function in order to support {M,F,A} request to the Erlang node Function string // Timeout Timeout int }
RemoteSpawnOptions defines options for RemoteSpawn method
type RemoteSpawnRequest ¶
type RemoteSpawnRequest struct { // Ref request id Ref etf.Ref // PID of the process made RemoteSpawn request From etf.Pid // Function provided via RemoteSpawnOptions.Function Function string }
RemoteSpawnRequest stores in process environment ("ergo:RemoteSpawnRequest") if it was spawned by RemoteSpawn request
type Saga ¶
type Saga struct {
Server
}
func (*Saga) HandleCall ¶
func (gs *Saga) HandleCall(process *ServerProcess, from ServerFrom, message etf.Term) (etf.Term, ServerStatus)
func (*Saga) HandleCast ¶
func (gs *Saga) HandleCast(process *ServerProcess, message etf.Term) ServerStatus
func (*Saga) HandleDirect ¶
func (gs *Saga) HandleDirect(process *ServerProcess, message interface{}) (interface{}, error)
func (*Saga) HandleInfo ¶
func (gs *Saga) HandleInfo(process *ServerProcess, message etf.Term) ServerStatus
func (*Saga) HandleJobFailed ¶
func (gs *Saga) HandleJobFailed(process *SagaProcess, id SagaTransactionID, from SagaJobID, reason string) SagaStatus
func (*Saga) HandleJobInterim ¶
func (gs *Saga) HandleJobInterim(process *SagaProcess, id SagaTransactionID, from SagaJobID, interim interface{}) SagaStatus
func (*Saga) HandleJobResult ¶
func (gs *Saga) HandleJobResult(process *SagaProcess, id SagaTransactionID, from SagaJobID, result interface{}) SagaStatus
func (*Saga) HandleSagaCall ¶
func (gs *Saga) HandleSagaCall(process *SagaProcess, from ServerFrom, message etf.Term) (etf.Term, ServerStatus)
func (*Saga) HandleSagaCast ¶
func (gs *Saga) HandleSagaCast(process *SagaProcess, message etf.Term) ServerStatus
func (*Saga) HandleSagaDirect ¶
func (gs *Saga) HandleSagaDirect(process *SagaProcess, message interface{}) (interface{}, error)
func (*Saga) HandleSagaInfo ¶
func (gs *Saga) HandleSagaInfo(process *SagaProcess, message etf.Term) ServerStatus
func (*Saga) HandleTxCommit ¶
func (gs *Saga) HandleTxCommit(process *SagaProcess, id SagaTransactionID, final interface{}) SagaStatus
func (*Saga) HandleTxDone ¶
func (gs *Saga) HandleTxDone(process *SagaProcess, id SagaTransactionID, result interface{}) (interface{}, SagaStatus)
func (*Saga) HandleTxInterim ¶
func (gs *Saga) HandleTxInterim(process *SagaProcess, id SagaTransactionID, from SagaNextID, interim interface{}) SagaStatus
type SagaBehavior ¶
type SagaBehavior interface { // InitSaga InitSaga(process *SagaProcess, args ...etf.Term) (SagaOptions, error) // HandleTxNew invokes on a new TX receiving by this saga. HandleTxNew(process *SagaProcess, id SagaTransactionID, value interface{}) SagaStatus // HandleTxResult invoked on a receiving result from the next saga HandleTxResult(process *SagaProcess, id SagaTransactionID, from SagaNextID, result interface{}) SagaStatus // HandleTxCancel invoked on a request of transaction cancelation. HandleTxCancel(process *SagaProcess, id SagaTransactionID, reason string) SagaStatus // HandleTxDone invoked when the transaction is done on a saga where it was created. // It returns the final result and SagaStatus. The commit message will deliver the final // result to all participants of this transaction (if it has enabled the TwoPhaseCommit option). // Otherwise the final result will be ignored. HandleTxDone(process *SagaProcess, id SagaTransactionID, result interface{}) (interface{}, SagaStatus) // HandleTxInterim invoked if received interim result from the next hop HandleTxInterim(process *SagaProcess, id SagaTransactionID, from SagaNextID, interim interface{}) SagaStatus // HandleTxCommit invoked if TwoPhaseCommit option is enabled for the given TX. // All sagas involved in this TX receive a commit message with final value and invoke this callback. // The final result has a value returned by HandleTxDone on a Saga created this TX. HandleTxCommit(process *SagaProcess, id SagaTransactionID, final interface{}) SagaStatus // HandleJobResult HandleJobResult(process *SagaProcess, id SagaTransactionID, from SagaJobID, result interface{}) SagaStatus // HandleJobInterim HandleJobInterim(process *SagaProcess, id SagaTransactionID, from SagaJobID, interim interface{}) SagaStatus // HandleJobFailed HandleJobFailed(process *SagaProcess, id SagaTransactionID, from SagaJobID, reason string) SagaStatus // HandleStageCall this callback is invoked on ServerProcess.Call. This method is optional // for the implementation HandleSagaCall(process *SagaProcess, from ServerFrom, message etf.Term) (etf.Term, ServerStatus) // HandleStageCast this callback is invoked on ServerProcess.Cast. This method is optional // for the implementation HandleSagaCast(process *SagaProcess, message etf.Term) ServerStatus // HandleStageInfo this callback is invoked on Process.Send. This method is optional // for the implementation HandleSagaInfo(process *SagaProcess, message etf.Term) ServerStatus // HandleSagaDirect this callback is invoked on Process.Direct. This method is optional // for the implementation HandleSagaDirect(process *SagaProcess, message interface{}) (interface{}, error) }
SagaBehavior interface
type SagaJob ¶
type SagaJob struct { ID SagaJobID TransactionID SagaTransactionID Value interface{} // contains filtered or unexported fields }
type SagaJobOptions ¶
type SagaJobOptions struct {
Timeout uint
}
type SagaNext ¶
type SagaNext struct { // Saga etf.Pid, string (for the locally registered process), gen.ProcessID{process, node} (for the remote process) Saga interface{} // Value a value for the invoking HandleTxNew on a next hop. Value interface{} // Timeout how long this Saga will be waiting for the result from the next hop. Default - 10 seconds Timeout uint // TrapCancel if the next saga fails, it will transform the cancel signal into the regular message gen.MessageSagaCancel, and HandleSagaInfo callback will be invoked. TrapCancel bool // contains filtered or unexported fields }
type SagaNextID ¶
func (SagaNextID) String ¶
func (id SagaNextID) String() string
type SagaOptions ¶
type SagaOptions struct { // MaxTransactions defines the limit for the number of active transactions. Default: 0 (unlimited) MaxTransactions uint // Worker Worker SagaWorkerBehavior }
type SagaProcess ¶
type SagaProcess struct { ServerProcess // contains filtered or unexported fields }
func (*SagaProcess) CancelJob ¶
func (sp *SagaProcess) CancelJob(id SagaTransactionID, job SagaJobID, reason string) error
func (*SagaProcess) CancelTransaction ¶
func (sp *SagaProcess) CancelTransaction(id SagaTransactionID, reason string) error
func (*SagaProcess) Next ¶
func (sp *SagaProcess) Next(id SagaTransactionID, next SagaNext) (SagaNextID, error)
func (*SagaProcess) SendInterim ¶
func (sp *SagaProcess) SendInterim(id SagaTransactionID, interim interface{}) error
func (*SagaProcess) SendResult ¶
func (sp *SagaProcess) SendResult(id SagaTransactionID, result interface{}) error
func (*SagaProcess) StartJob ¶
func (sp *SagaProcess) StartJob(id SagaTransactionID, options SagaJobOptions, value interface{}) (SagaJobID, error)
func (*SagaProcess) StartTransaction ¶
func (sp *SagaProcess) StartTransaction(options SagaTransactionOptions, value interface{}) SagaTransactionID
type SagaStatus ¶
type SagaStatus error
type SagaTransaction ¶
type SagaTransactionID ¶
func (SagaTransactionID) String ¶
func (id SagaTransactionID) String() string
type SagaTransactionOptions ¶
type SagaTransactionOptions struct { // HopLimit defines a number of hop within the transaction. Default limit // is 0 (no limit). HopLimit uint // Lifespan defines a lifespan for the transaction in seconds. Default is 60. Lifespan uint // TwoPhaseCommit enables 2PC for the transaction. This option makes all // Sagas involved in this transaction invoke HandleCommit callback on them and // invoke HandleCommitJob callback on Worker processes once the transaction is finished. TwoPhaseCommit bool }
type SagaWorker ¶
type SagaWorker struct {
Server
}
func (*SagaWorker) HandleCall ¶
func (w *SagaWorker) HandleCall(process *ServerProcess, from ServerFrom, message etf.Term) (etf.Term, ServerStatus)
func (*SagaWorker) HandleCast ¶
func (w *SagaWorker) HandleCast(process *ServerProcess, message etf.Term) ServerStatus
func (*SagaWorker) HandleDirect ¶
func (w *SagaWorker) HandleDirect(process *ServerProcess, message interface{}) (interface{}, error)
func (*SagaWorker) HandleInfo ¶
func (w *SagaWorker) HandleInfo(process *ServerProcess, message etf.Term) ServerStatus
func (*SagaWorker) HandleJobCommit ¶
func (w *SagaWorker) HandleJobCommit(process *SagaWorkerProcess, final interface{})
default callbacks
func (*SagaWorker) HandleWorkerCall ¶
func (w *SagaWorker) HandleWorkerCall(process *SagaWorkerProcess, from ServerFrom, message etf.Term) (etf.Term, ServerStatus)
func (*SagaWorker) HandleWorkerCast ¶
func (w *SagaWorker) HandleWorkerCast(process *SagaWorkerProcess, message etf.Term) ServerStatus
func (*SagaWorker) HandleWorkerDirect ¶
func (w *SagaWorker) HandleWorkerDirect(process *SagaWorkerProcess, message interface{}) (interface{}, error)
func (*SagaWorker) HandleWorkerInfo ¶
func (w *SagaWorker) HandleWorkerInfo(process *SagaWorkerProcess, message etf.Term) ServerStatus
func (*SagaWorker) HandleWorkerTerminate ¶
func (w *SagaWorker) HandleWorkerTerminate(process *SagaWorkerProcess, reason string)
func (*SagaWorker) Init ¶
func (w *SagaWorker) Init(process *ServerProcess, args ...etf.Term) error
func (*SagaWorker) Terminate ¶
func (w *SagaWorker) Terminate(process *ServerProcess, reason string)
type SagaWorkerBehavior ¶
type SagaWorkerBehavior interface { ServerBehavior // HandleJobStart invoked on a worker start HandleJobStart(process *SagaWorkerProcess, job SagaJob) error // HandleJobCancel invoked if transaction was canceled before the termination. HandleJobCancel(process *SagaWorkerProcess, reason string) // HandleJobCommit invoked if this job was a part of the transaction // with enabled TwoPhaseCommit option. All workers involved in this TX // handling are receiving this call. Callback invoked before the termination. HandleJobCommit(process *SagaWorkerProcess, final interface{}) // HandleWorkerInfo this callback is invoked on Process.Send. This method is optional // for the implementation HandleWorkerInfo(process *SagaWorkerProcess, message etf.Term) ServerStatus // HandleWorkerCast this callback is invoked on ServerProcess.Cast. This method is optional // for the implementation HandleWorkerCast(process *SagaWorkerProcess, message etf.Term) ServerStatus // HandleWorkerCall this callback is invoked on ServerProcess.Call. This method is optional // for the implementation HandleWorkerCall(process *SagaWorkerProcess, from ServerFrom, message etf.Term) (etf.Term, ServerStatus) // HandleWorkerDirect this callback is invoked on Process.Direct. This method is optional // for the implementation HandleWorkerDirect(process *SagaWorkerProcess, message interface{}) (interface{}, error) // HandleWorkerTerminate this callback invoked on a process termination HandleWorkerTerminate(process *SagaWorkerProcess, reason string) }
type SagaWorkerProcess ¶
type SagaWorkerProcess struct { ServerProcess // contains filtered or unexported fields }
func (*SagaWorkerProcess) SendInterim ¶
func (wp *SagaWorkerProcess) SendInterim(interim interface{}) error
SendInterim
func (*SagaWorkerProcess) SendResult ¶
func (wp *SagaWorkerProcess) SendResult(result interface{}) error
SendResult sends the result and terminates this worker if 2PC is disabled. Otherwise, will be waiting for cancel/commit signal.
type Server ¶
type Server struct{}
Server is implementation of ProcessBehavior interface for Server objects
func (*Server) HandleCall ¶
func (gs *Server) HandleCall(process *ServerProcess, from ServerFrom, message etf.Term) (etf.Term, ServerStatus)
func (*Server) HandleCast ¶
func (gs *Server) HandleCast(process *ServerProcess, message etf.Term) ServerStatus
func (*Server) HandleDirect ¶
func (gs *Server) HandleDirect(process *ServerProcess, message interface{}) (interface{}, error)
func (*Server) HandleInfo ¶
func (gs *Server) HandleInfo(process *ServerProcess, message etf.Term) ServerStatus
func (*Server) Init ¶
func (gs *Server) Init(process *ServerProcess, args ...etf.Term) error
default callbacks for Server interface
func (*Server) ProcessInit ¶
func (*Server) ProcessLoop ¶
func (gs *Server) ProcessLoop(ps ProcessState, started chan<- bool) string
func (*Server) Terminate ¶
func (gs *Server) Terminate(process *ServerProcess, reason string)
type ServerBehavior ¶
type ServerBehavior interface { ProcessBehavior // Init invoked on a start Server Init(process *ServerProcess, args ...etf.Term) error // HandleCast invoked if Server received message sent with ServerProcess.Cast. // Return ServerStatusStop to stop server with "normal" reason. Use ServerStatus(error) // for the custom reason HandleCast(process *ServerProcess, message etf.Term) ServerStatus // HandleCall invoked if Server got sync request using ServerProcess.Call HandleCall(process *ServerProcess, from ServerFrom, message etf.Term) (etf.Term, ServerStatus) // HandleDirect invoked on a direct request made with Process.Direct HandleDirect(process *ServerProcess, message interface{}) (interface{}, error) // HandleInfo invoked if Server received message sent with Process.Send. HandleInfo(process *ServerProcess, message etf.Term) ServerStatus // Terminate invoked on a termination process. ServerProcess.State is not locked during // this callback. Terminate(process *ServerProcess, reason string) }
ServerBehavior interface
type ServerProcess ¶
type ServerProcess struct { ProcessState // contains filtered or unexported fields }
ServerState state of the Server process.
func (*ServerProcess) Call ¶
Call makes outgoing sync request in fashion of 'gen_server:call'. 'to' can be Pid, registered local name or gen.ProcessID{RegisteredName, NodeName}. This method shouldn't be used outside of the actor. Use Direct method instead.
func (*ServerProcess) CallRPCWithTimeout ¶
func (sp *ServerProcess) CallRPCWithTimeout(timeout int, node, module, function string, args ...etf.Term) (etf.Term, error)
CallRPCWithTimeout evaluate rpc call with given node/MFA and timeout
func (*ServerProcess) CallWithTimeout ¶
func (sp *ServerProcess) CallWithTimeout(to interface{}, message etf.Term, timeout int) (etf.Term, error)
CallWithTimeout makes outgoing sync request in fashiod of 'gen_server:call' with given timeout. This method shouldn't be used outside of the actor. Use DirectWithTimeout method instead.
func (*ServerProcess) Cast ¶
func (sp *ServerProcess) Cast(to interface{}, message etf.Term) error
Cast sends a message in fashion of 'gen_server:cast'. 'to' can be a Pid, registered local name or gen.ProcessID{RegisteredName, NodeName}
func (*ServerProcess) CastAfter ¶
func (sp *ServerProcess) CastAfter(to interface{}, message etf.Term, after time.Duration) context.CancelFunc
CastAfter a simple wrapper for Process.SendAfter to send a message in fashion of 'gen_server:cast'
func (*ServerProcess) CastRPC ¶
func (sp *ServerProcess) CastRPC(node, module, function string, args ...etf.Term) error
CastRPC evaluate rpc cast with given node/MFA
func (*ServerProcess) SendReply ¶
func (sp *ServerProcess) SendReply(from ServerFrom, reply etf.Term) error
SendReply sends a reply message to the sender made ServerProcess.Call request. Useful for the case with dispatcher and pool of workers: Dispatcher process forwards Call requests (asynchronously) within a HandleCall callback to the worker(s) using ServerProcess.Cast or ServerProcess.Send but returns ServerStatusIgnore instead of ServerStatusOK; Worker process sends result using ServerProcess.SendReply method with 'from' value received from the Dispatcher.
type ServerStatus ¶
type ServerStatus error
var ( ServerStatusOK ServerStatus = nil ServerStatusStop ServerStatus = fmt.Errorf("stop") ServerStatusIgnore ServerStatus = fmt.Errorf("ignore") )
func ServerStatusStopWithReason ¶
func ServerStatusStopWithReason(s string) ServerStatus
type Stage ¶
type Stage struct {
Server
}
func (*Stage) DisableAutoDemand ¶ added in v1.999.200
func (s *Stage) DisableAutoDemand(p Process, subscription StageSubscription) error
DisableAutoDemand means that demand must be sent to producers explicitly using Ask method. This mode can be used when a special behavior is desired.
func (*Stage) DisableForwardDemand ¶ added in v1.999.200
DisableForwardDemand disables forwarding messages to the HandleDemand on a producer stage. This is useful as a synchronization mechanism, where the demand is accumulated until all consumers are subscribed.
func (*Stage) EnableAutoDemand ¶ added in v1.999.200
func (s *Stage) EnableAutoDemand(p Process, subscription StageSubscription) error
EnableAutoDemand enables auto demand mode (this is default mode for the consumer).
func (*Stage) EnableForwardDemand ¶ added in v1.999.200
EnableForwardDemand enables forwarding messages to the HandleDemand on a producer stage. This is default mode for the producer.
func (*Stage) HandleCall ¶
func (gst *Stage) HandleCall(process *ServerProcess, from ServerFrom, message etf.Term) (etf.Term, ServerStatus)
func (*Stage) HandleCancel ¶
func (gst *Stage) HandleCancel(process *StageProcess, subscription StageSubscription, reason string) StageStatus
func (*Stage) HandleCanceled ¶
func (gst *Stage) HandleCanceled(process *StageProcess, subscription StageSubscription, reason string) StageStatus
func (*Stage) HandleCast ¶
func (gst *Stage) HandleCast(process *ServerProcess, message etf.Term) ServerStatus
func (*Stage) HandleDemand ¶
func (gst *Stage) HandleDemand(process *StageProcess, subscription StageSubscription, count uint) (etf.List, StageStatus)
func (*Stage) HandleDirect ¶
func (gst *Stage) HandleDirect(process *ServerProcess, message interface{}) (interface{}, error)
func (*Stage) HandleEvents ¶
func (gst *Stage) HandleEvents(process *StageProcess, subscription StageSubscription, events etf.List) StageStatus
func (*Stage) HandleInfo ¶
func (gst *Stage) HandleInfo(process *ServerProcess, message etf.Term) ServerStatus
func (*Stage) HandleStageCall ¶
func (gst *Stage) HandleStageCall(process *StageProcess, from ServerFrom, message etf.Term) (etf.Term, ServerStatus)
func (*Stage) HandleStageCast ¶
func (gst *Stage) HandleStageCast(process *StageProcess, message etf.Term) ServerStatus
func (*Stage) HandleStageDirect ¶
func (gst *Stage) HandleStageDirect(process *StageProcess, message interface{}) (interface{}, error)
func (*Stage) HandleStageInfo ¶
func (gst *Stage) HandleStageInfo(process *StageProcess, message etf.Term) ServerStatus
func (*Stage) HandleSubscribe ¶
func (gst *Stage) HandleSubscribe(process *StageProcess, subscription StageSubscription, options StageSubscribeOptions) StageStatus
func (*Stage) HandleSubscribed ¶
func (gst *Stage) HandleSubscribed(process *StageProcess, subscription StageSubscription, opts StageSubscribeOptions) (bool, StageStatus)
func (*Stage) Init ¶
func (gst *Stage) Init(process *ServerProcess, args ...etf.Term) error
gen.Server callbacks
func (*Stage) InitStage ¶
func (gst *Stage) InitStage(process *StageProcess, args ...etf.Term) error
func (*Stage) SetCancelMode ¶
func (s *Stage) SetCancelMode(p Process, subscription StageSubscription, cancel StageCancelMode) error
SetCancelMode defines how consumer will handle termination of the producer. There are 3 modes: StageCancelPermanent (default) - consumer exits when the producer cancels or exits StageCancelTransient - consumer exits only if reason is not normal, shutdown, or {shutdown, reason} StageCancelTemporary - never exits
type StageBehavior ¶
type StageBehavior interface { // InitStage InitStage(process *StageProcess, args ...etf.Term) (StageOptions, error) // HandleDemand this callback is invoked on a producer stage // The producer that implements this callback must either store the demand, or return the amount of requested events. HandleDemand(process *StageProcess, subscription StageSubscription, count uint) (etf.List, StageStatus) // HandleEvents this callback is invoked on a consumer stage. HandleEvents(process *StageProcess, subscription StageSubscription, events etf.List) StageStatus // HandleSubscribe This callback is invoked on a producer stage. HandleSubscribe(process *StageProcess, subscription StageSubscription, options StageSubscribeOptions) StageStatus // HandleSubscribed this callback is invoked as a confirmation for the subscription request // Returning false means that demand must be sent to producers explicitly using Ask method. // Returning true means the stage implementation will take care of automatically sending. HandleSubscribed(process *StageProcess, subscription StageSubscription, opts StageSubscribeOptions) (bool, StageStatus) // HandleCancel // Invoked when a consumer is no longer subscribed to a producer (invoked on a producer stage) // The cancelReason will be a {Cancel: "cancel", Reason: _} if the reason for cancellation // was a Stage.Cancel call. Any other value means the cancellation reason was // due to an EXIT. HandleCancel(process *StageProcess, subscription StageSubscription, reason string) StageStatus // HandleCanceled // Invoked when a consumer is no longer subscribed to a producer (invoked on a consumer stage) // Termination this stage depends on a cancel mode for the given subscription. For the cancel mode // StageCancelPermanent - this stage will be terminated right after this callback invoking. // For the cancel mode StageCancelTransient - it depends on a reason of subscription canceling. // Cancel mode StageCancelTemporary keeps this stage alive whether the reason could be. HandleCanceled(process *StageProcess, subscription StageSubscription, reason string) StageStatus // HandleStageCall this callback is invoked on ServerProcess.Call. This method is optional // for the implementation HandleStageCall(process *StageProcess, from ServerFrom, message etf.Term) (etf.Term, ServerStatus) // HandleStageDirect this callback is invoked on Process.Direct. This method is optional // for the implementation HandleStageDirect(process *StageProcess, message interface{}) (interface{}, error) // HandleStageCast this callback is invoked on ServerProcess.Cast. This method is optional // for the implementation HandleStageCast(process *StageProcess, message etf.Term) ServerStatus // HandleStageInfo this callback is invoked on Process.Send. This method is optional // for the implementation HandleStageInfo(process *StageProcess, message etf.Term) ServerStatus }
StageBehavior interface for the Stage inmplementation
type StageCancelMode ¶
type StageCancelMode uint
const ( StageCancelPermanent StageCancelMode = 0 StageCancelTransient StageCancelMode = 1 StageCancelTemporary StageCancelMode = 2 )
type StageCancelReason ¶
type StageDispatchItem ¶
type StageDispatchItem struct {
// contains filtered or unexported fields
}
type StageDispatcher ¶
type StageDispatcher int
type StageDispatcherBehavior ¶
type StageDispatcherBehavior interface { // InitStageDispatcher(opts) Init(opts StageOptions) (state interface{}) // Ask called every time a consumer sends demand Ask(state interface{}, subscription StageSubscription, count uint) // Cancel called every time a subscription is cancelled or the consumer goes down. Cancel(state interface{}, subscription StageSubscription) // Dispatch called every time a producer wants to dispatch an event. Dispatch(state interface{}, events etf.List) []StageDispatchItem // Subscribe called every time the producer gets a new subscriber Subscribe(state interface{}, subscription StageSubscription, opts StageSubscribeOptions) error }
StageDispatcherBehavior defined interface for the dispatcher implementation. To create a custom dispatcher you should implement this interface and use it in StageOptions as a Dispatcher
func CreateStageDispatcherBroadcast ¶
func CreateStageDispatcherBroadcast() StageDispatcherBehavior
CreateStageDispatcherBroadcast creates a dispatcher that accumulates demand from all consumers before broadcasting events to all of them. This dispatcher guarantees that events are dispatched to all consumers without exceeding the demand of any given consumer. The demand is only sent upstream once all consumers ask for data.
func CreateStageDispatcherDemand ¶
func CreateStageDispatcherDemand() StageDispatcherBehavior
CreateStageDispatcherDemand creates a dispatcher that sends batches to the highest demand. This is the default dispatcher used by Stage. In order to avoid greedy consumers, it is recommended that all consumers have exactly the same maximum demand.
func CreateStageDispatcherPartition ¶
func CreateStageDispatcherPartition(n uint, hash func(etf.Term) int) StageDispatcherBehavior
CreateStageDispatcherPartition creates a dispatcher that sends events according to partitions. Number of partitions 'n' must be > 0. 'hash' should return number within range [0,n). Value outside of this range is discarding event. If 'hash' is nil the random partition will be used on every event.
type StageOptions ¶
type StageOptions struct { // DisableForwarding. the demand is always forwarded to the HandleDemand callback. // When this options is set to 'true', demands are accumulated until mode is // set back to 'false' using DisableDemandAccumulating method DisableForwarding bool // BufferSize the size of the buffer to store events without demand. // default value = defaultDispatcherBufferSize BufferSize uint // BufferKeepLast defines whether the first or last entries should be // kept on the buffer in case the buffer size is exceeded. BufferKeepLast bool Dispatcher StageDispatcherBehavior }
StageOptions defines the producer configuration using Init callback. It will be ignored if it acts as a consumer only.
type StageProcess ¶
type StageProcess struct { ServerProcess // contains filtered or unexported fields }
func (*StageProcess) Ask ¶
func (p *StageProcess) Ask(subscription StageSubscription, count uint) error
Ask makes a demand request for the given subscription. This function must only be used in the cases when a consumer sets a subscription to manual mode using DisableAutoDemand
func (*StageProcess) Cancel ¶
func (p *StageProcess) Cancel(subscription StageSubscription, reason string) error
Cancel
func (*StageProcess) SendEvents ¶
func (p *StageProcess) SendEvents(events etf.List) error
SendEvents sends events to the subscribers
func (*StageProcess) Subscribe ¶
func (p *StageProcess) Subscribe(producer etf.Term, opts StageSubscribeOptions) (StageSubscription, error)
Subscribe subscribes to the given producer. HandleSubscribed callback will be invoked on a consumer stage once a request for the subscription is sent. If something went wrong on a producer side the callback HandleCancel will be invoked with a reason of cancelation.
type StageStatus ¶
type StageStatus error
var ( StageStatusOK StageStatus = nil StageStatusStop StageStatus = fmt.Errorf("stop") StageStatusUnsupported StageStatus = fmt.Errorf("unsupported") StageStatusNotAProducer StageStatus = fmt.Errorf("not a producer") )
type StageSubscribeOptions ¶
type StageSubscribeOptions struct { MinDemand uint `etf:"min_demand"` MaxDemand uint `etf:"max_demand"` // The stage implementation will take care of automatically sending // demand to producer (as a default behavior). You can disable it // setting ManualDemand to true ManualDemand bool `etf:"manual"` // What should happened with consumer if producer has terminated // StageCancelPermanent the consumer exits when the producer cancels or exits. // StageCancelTransient the consumer exits only if reason is not "normal", // "shutdown", or {"shutdown", _} // StageCancelTemporary the consumer never exits Cancel StageCancelMode `etf:"cancel"` // Partition is defined the number of partition this subscription should belongs to. // This option uses in the DispatcherPartition Partition uint `etf:"partition"` // Extra is intended to be a custom set of options for the custom implementation // of StageDispatcherBehavior Extra etf.Term `etf:"extra"` }
type Supervisor ¶
type Supervisor struct{}
Supervisor is implementation of ProcessBehavior interface
func (*Supervisor) ProcessInit ¶
func (sv *Supervisor) ProcessInit(p Process, args ...etf.Term) (ProcessState, error)
func (*Supervisor) ProcessLoop ¶
func (sv *Supervisor) ProcessLoop(ps ProcessState, started chan<- bool) string
func (*Supervisor) StartChild ¶
func (sv *Supervisor) StartChild(supervisor Process, name string, args ...etf.Term) (Process, error)
StartChild dynamically starts a child process with given name of child spec which is defined by Init call.
type SupervisorBehavior ¶
type SupervisorBehavior interface { ProcessBehavior Init(args ...etf.Term) (SupervisorSpec, error) }
SupervisorBehavior interface
type SupervisorChildSpec ¶
type SupervisorChildSpec struct { // Node to run child on remote node Node string Name string Child ProcessBehavior Args []etf.Term // contains filtered or unexported fields }
type SupervisorSpec ¶
type SupervisorSpec struct { Name string Children []SupervisorChildSpec Strategy SupervisorStrategy // contains filtered or unexported fields }
type SupervisorStrategy ¶
type SupervisorStrategy struct { Type SupervisorStrategyType Intensity uint16 Period uint16 Restart SupervisorStrategyRestart }
type SupervisorStrategyRestart ¶
type SupervisorStrategyRestart = string
type SupervisorStrategyType ¶
type SupervisorStrategyType = string