Documentation ¶
Index ¶
- func NextState(pipeline *pipeline.Pipeline, state storage.ProcessState, ...) (newState storage.ProcessState, err error)
- type EnvelopeActorPair
- type Manager
- func (m *Manager) Close()
- func (m *Manager) Get(id api.UUID) (*Process, error)
- func (m *Manager) GetByTrigger(emitter api.Actor, envelopeID api.UUID) (*Process, error)
- func (m *Manager) GetEmitterEnvelopeStatus(emitter api.Actor, envelopeID api.UUID) api.Process_Status
- func (m *Manager) ListPostMortem(level api.LogLevel, status ...api.PMProcess_Status) ([]*Process, error)
- func (m *Manager) LookupOrCreate(emitter api.Actor, output string, envelopeID api.UUID) (*Process, error)
- func (m *Manager) Purge(id api.UUID) error
- func (m *Manager) PurgeMany(ids []api.UUID) ([]api.UUID, error)
- func (m *Manager) Replay(process *Process, envelopeStorage storage.EnvelopeStorage, match bool) (*Process, error)
- type Process
- func (p *Process) ActorProcessingEnd(actorID api.UUID, nodeID string, status api.ActorProcessingState_Status, ...)
- func (p Process) Cancel(reason string) error
- func (p *Process) Export(envelopeStorage storage.EnvelopeStorage, log zerolog.Logger) (map[string]interface{}, error)
- func (p *Process) ExportStream(ctx context.Context, envelopeStorage storage.EnvelopeStorage, ...) error
- func (p *Process) GetPostMortemState() (storage.PostMortemState, error)
- func (p *Process) GetResponse() api.UUID
- func (p Process) GetStatus() api.Process_Status
- func (p Process) GetTargets(output storage.NodeOutputRef, envelopeID api.UUID, noRouteTableUpdate bool) ([]api.EnvelopeTarget, error)
- func (p *Process) MatchPipeline(output string, eventTypes []string) error
- func (p Process) Pause() error
- func (p Process) Resume() error
- func (p *Process) SetPostMortemState(state storage.PostMortemState) error
- func (p *Process) UpdateReceiveStatus(envelopeID, actorID api.UUID, outputref storage.NodeOutputRef, ...) error
- type StateEngine
- func (e *StateEngine) EndProcessNode(node storage.ProcessNode, status api.ActorProcessingState_Status, ...)
- func (e *StateEngine) GetNodeState(node storage.ProcessNode) (storage.ProcessNodeState, storage.Node)
- func (e *StateEngine) GetState() storage.ProcessState
- func (e *StateEngine) HitInput(node storage.ProcessNode, input string, complete bool, closed bool, ...)
- func (e *StateEngine) HitOutput(node storage.ProcessNode, output string, status storage.ReceptionStatus, ...)
- func (e *StateEngine) SetNodeState(node storage.ProcessNode, state storage.ProcessNodeState)
Constants ¶
This section is empty.
Variables ¶
This section is empty.
Functions ¶
func NextState ¶
func NextState( pipeline *pipeline.Pipeline, state storage.ProcessState, logentry storage.ProcessLogEntry, log zerolog.Logger, ) (newState storage.ProcessState, err error)
NextState calculate the next state of a process after aplying a logentry
Types ¶
type EnvelopeActorPair ¶
EnvelopeActorPair contains a pair (EnvelopeID, ActorID)
type Manager ¶
type Manager struct { PipelineManager *pipeline.Manager ActorStorage storage.ActorStorage ProcessStorage storage.ProcessStorage Log zerolog.Logger CurrentProcessList []*Process }
Manager manage the processes
func NewManager ¶
func NewManager( pipelineManager *pipeline.Manager, actorStorage storage.ActorStorage, processStorage storage.ProcessStorage, logger zerolog.Logger, ) *Manager
NewManager creates a process.Manager
func (*Manager) GetByTrigger ¶
GetByTrigger returns the process matching the given trigger emitter/envelope
func (*Manager) GetEmitterEnvelopeStatus ¶
func (m *Manager) GetEmitterEnvelopeStatus(emitter api.Actor, envelopeID api.UUID) api.Process_Status
GetEmitterEnvelopeStatus returns the status of the process triggered by the given emitter/envelope
func (*Manager) ListPostMortem ¶
func (m *Manager) ListPostMortem(level api.LogLevel, status ...api.PMProcess_Status) ([]*Process, error)
ListPostMortem returns dead processes
func (*Manager) LookupOrCreate ¶
func (m *Manager) LookupOrCreate(emitter api.Actor, output string, envelopeID api.UUID) (*Process, error)
LookupOrCreate returns the Process handling the given envelope or create if necessary
func (*Manager) Replay ¶
func (m *Manager) Replay(process *Process, envelopeStorage storage.EnvelopeStorage, match bool) (*Process, error)
Replay creates a new process that replays the current process. The process must be terminated, as well as all the processes of the same group. Also, the trigger envelope must be complete and valid. If match is true, the MatchPipeline is called again. If false, the same pipeline is used, even if deactivated. match is ignored if the process never matched a pipeline, in which case MatchPipeline is always called.
type Process ¶
type Process struct { ID api.UUID CreatedAt time.Time GroupID api.UUID ReplayOf api.UUID TriggerEmitterID api.UUID TriggerEnvelopeID api.UUID Pipeline *pipeline.Pipeline SourceRef storage.NodeOutputRef // contains filtered or unexported fields }
Process orchestrate a pipeline execution
func NewProcess ¶
func NewProcess( actorStorage storage.ActorStorage, emitterID, envelopeID api.UUID, pipeline *pipeline.Pipeline, source storage.NodeOutputRef, manager *Manager, ) *Process
NewProcess creates a process
func (*Process) ActorProcessingEnd ¶
func (p *Process) ActorProcessingEnd(actorID api.UUID, nodeID string, status api.ActorProcessingState_Status, log zerolog.Logger)
ActorProcessingEnd informs the process
func (*Process) Export ¶
func (p *Process) Export(envelopeStorage storage.EnvelopeStorage, log zerolog.Logger) (map[string]interface{}, error)
Export exports all the data of the process, including envelopes and logs, as a map suitable for json or yaml export This implementation is not big-envelope friendly and is now deprecated. See ExportStream
func (*Process) ExportStream ¶
func (p *Process) ExportStream( ctx context.Context, envelopeStorage storage.EnvelopeStorage, jobStorage storage.JobStorage, logStorage storage.LogStorage, log zerolog.Logger, out io.Writer, exportEnvelope bool, exportJobs bool, exportLogs bool, exportProcessLogs bool, envelopeSizeLimit int64, logLevel api.LogLevel, ) error
ExportStream export the process as a stream, which supports big envelopes and logs
func (*Process) GetPostMortemState ¶
func (p *Process) GetPostMortemState() (storage.PostMortemState, error)
GetPostMortemState set the postmortem state of the process
func (*Process) GetResponse ¶
GetResponse returns the process response envelope id if any
func (Process) GetStatus ¶
func (p Process) GetStatus() api.Process_Status
GetStatus returns the current status of the process
func (Process) GetTargets ¶
func (p Process) GetTargets(output storage.NodeOutputRef, envelopeID api.UUID, noRouteTableUpdate bool) ([]api.EnvelopeTarget, error)
GetTargets returns the targets for an envelope coming from a given emitter
func (*Process) MatchPipeline ¶
MatchPipeline attempts to find the pipeline that will drive the process
func (*Process) SetPostMortemState ¶
func (p *Process) SetPostMortemState(state storage.PostMortemState) error
SetPostMortemState set the postmortem state of the process
func (*Process) UpdateReceiveStatus ¶
func (p *Process) UpdateReceiveStatus(envelopeID, actorID api.UUID, outputref storage.NodeOutputRef, status storage.ReceptionStatus) error
UpdateReceiveStatus is called whenever an envelope fragment is received from a node
type StateEngine ¶
type StateEngine struct {
// contains filtered or unexported fields
}
StateEngine provides api to consistently modify a process state
func (*StateEngine) EndProcessNode ¶
func (e *StateEngine) EndProcessNode( node storage.ProcessNode, status api.ActorProcessingState_Status, timestamp time.Time, )
EndProcessNode signal the end of processing by a process node
func (*StateEngine) GetNodeState ¶
func (e *StateEngine) GetNodeState( node storage.ProcessNode, ) (storage.ProcessNodeState, storage.Node)
GetNodeState returns a copy of a node state and the node graph definition
func (*StateEngine) GetState ¶
func (e *StateEngine) GetState() storage.ProcessState
GetState returns the current state (which is expected to be consistent)
func (*StateEngine) HitInput ¶
func (e *StateEngine) HitInput( node storage.ProcessNode, input string, complete bool, closed bool, timestamp time.Time, )
HitInput signal a data emission to an input
func (*StateEngine) HitOutput ¶
func (e *StateEngine) HitOutput( node storage.ProcessNode, output string, status storage.ReceptionStatus, timestamp time.Time, )
HitOutput signal a data reception from an output
func (*StateEngine) SetNodeState ¶
func (e *StateEngine) SetNodeState(node storage.ProcessNode, state storage.ProcessNodeState)
SetNodeState updates a node state