process

package
v0.0.0-...-cdef871 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Oct 6, 2023 License: MIT Imports: 12 Imported by: 0

Documentation

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

func NextState

func NextState(
	pipeline *pipeline.Pipeline, state storage.ProcessState, logentry storage.ProcessLogEntry,
	log zerolog.Logger,
) (newState storage.ProcessState, err error)

NextState calculate the next state of a process after aplying a logentry

Types

type EnvelopeActorPair

type EnvelopeActorPair struct {
	EnvelopeID api.UUID
	ActorID    api.UUID
}

EnvelopeActorPair contains a pair (EnvelopeID, ActorID)

type Manager

type Manager struct {
	PipelineManager *pipeline.Manager
	ActorStorage    storage.ActorStorage
	ProcessStorage  storage.ProcessStorage

	Log zerolog.Logger

	CurrentProcessList []*Process
}

Manager manage the processes

func NewManager

func NewManager(
	pipelineManager *pipeline.Manager,
	actorStorage storage.ActorStorage,
	processStorage storage.ProcessStorage,
	logger zerolog.Logger,
) *Manager

NewManager creates a process.Manager

func (*Manager) Close

func (m *Manager) Close()

Close does nothing for now

func (*Manager) Get

func (m *Manager) Get(id api.UUID) (*Process, error)

Get returns a process from its ID

func (*Manager) GetByTrigger

func (m *Manager) GetByTrigger(emitter api.Actor, envelopeID api.UUID) (*Process, error)

GetByTrigger returns the process matching the given trigger emitter/envelope

func (*Manager) GetEmitterEnvelopeStatus

func (m *Manager) GetEmitterEnvelopeStatus(emitter api.Actor, envelopeID api.UUID) api.Process_Status

GetEmitterEnvelopeStatus returns the status of the process triggered by the given emitter/envelope

func (*Manager) ListPostMortem

func (m *Manager) ListPostMortem(level api.LogLevel, status ...api.PMProcess_Status) ([]*Process, error)

ListPostMortem returns dead processes

func (*Manager) LookupOrCreate

func (m *Manager) LookupOrCreate(emitter api.Actor, output string, envelopeID api.UUID) (*Process, error)

LookupOrCreate returns the Process handling the given envelope or create if necessary

func (*Manager) Purge

func (m *Manager) Purge(id api.UUID) error

Purge removes any trace of the process in the process storage.

func (*Manager) PurgeMany

func (m *Manager) PurgeMany(ids []api.UUID) ([]api.UUID, error)

PurgeMany removes any trace of the processes in the process storage.

func (*Manager) Replay

func (m *Manager) Replay(process *Process, envelopeStorage storage.EnvelopeStorage, match bool) (*Process, error)

Replay creates a new process that replays the current process. The process must be terminated, as well as all the processes of the same group. Also, the trigger envelope must be complete and valid. If match is true, the MatchPipeline is called again. If false, the same pipeline is used, even if deactivated. match is ignored if the process never matched a pipeline, in which case MatchPipeline is always called.

type Process

type Process struct {
	ID        api.UUID
	CreatedAt time.Time
	GroupID   api.UUID
	ReplayOf  api.UUID

	TriggerEmitterID  api.UUID
	TriggerEnvelopeID api.UUID

	Pipeline  *pipeline.Pipeline
	SourceRef storage.NodeOutputRef
	// contains filtered or unexported fields
}

Process orchestrate a pipeline execution

func NewProcess

func NewProcess(
	actorStorage storage.ActorStorage,
	emitterID,
	envelopeID api.UUID,
	pipeline *pipeline.Pipeline,
	source storage.NodeOutputRef,
	manager *Manager,
) *Process

NewProcess creates a process

func (*Process) ActorProcessingEnd

func (p *Process) ActorProcessingEnd(actorID api.UUID, nodeID string, status api.ActorProcessingState_Status, log zerolog.Logger)

ActorProcessingEnd informs the process

func (Process) Cancel

func (p Process) Cancel(reason string) error

Cancel sets the process state to ERROR with "canceled: <reason>" reason

func (*Process) Export

func (p *Process) Export(envelopeStorage storage.EnvelopeStorage, log zerolog.Logger) (map[string]interface{}, error)

Export exports all the data of the process, including envelopes and logs, as a map suitable for json or yaml export This implementation is not big-envelope friendly and is now deprecated. See ExportStream

func (*Process) ExportStream

func (p *Process) ExportStream(
	ctx context.Context,
	envelopeStorage storage.EnvelopeStorage,
	jobStorage storage.JobStorage,
	logStorage storage.LogStorage,
	log zerolog.Logger,
	out io.Writer,
	exportEnvelope bool,
	exportJobs bool,
	exportLogs bool,
	exportProcessLogs bool,
	envelopeSizeLimit int64,
	logLevel api.LogLevel,
) error

ExportStream export the process as a stream, which supports big envelopes and logs

func (*Process) GetPostMortemState

func (p *Process) GetPostMortemState() (storage.PostMortemState, error)

GetPostMortemState set the postmortem state of the process

func (*Process) GetResponse

func (p *Process) GetResponse() api.UUID

GetResponse returns the process response envelope id if any

func (Process) GetStatus

func (p Process) GetStatus() api.Process_Status

GetStatus returns the current status of the process

func (Process) GetTargets

func (p Process) GetTargets(output storage.NodeOutputRef, envelopeID api.UUID, noRouteTableUpdate bool) ([]api.EnvelopeTarget, error)

GetTargets returns the targets for an envelope coming from a given emitter

func (*Process) MatchPipeline

func (p *Process) MatchPipeline(output string, eventTypes []string) error

MatchPipeline attempts to find the pipeline that will drive the process

func (Process) Pause

func (p Process) Pause() error

Pause switch the process state to ProcessPaused

func (Process) Resume

func (p Process) Resume() error

Resume switch the process state to ProcessRunning if previously paused

func (*Process) SetPostMortemState

func (p *Process) SetPostMortemState(state storage.PostMortemState) error

SetPostMortemState set the postmortem state of the process

func (*Process) UpdateReceiveStatus

func (p *Process) UpdateReceiveStatus(envelopeID, actorID api.UUID, outputref storage.NodeOutputRef, status storage.ReceptionStatus) error

UpdateReceiveStatus is called whenever an envelope fragment is received from a node

type StateEngine

type StateEngine struct {
	// contains filtered or unexported fields
}

StateEngine provides api to consistently modify a process state

func (*StateEngine) EndProcessNode

func (e *StateEngine) EndProcessNode(
	node storage.ProcessNode,
	status api.ActorProcessingState_Status,
	timestamp time.Time,
)

EndProcessNode signal the end of processing by a process node

func (*StateEngine) GetNodeState

func (e *StateEngine) GetNodeState(
	node storage.ProcessNode,
) (storage.ProcessNodeState, storage.Node)

GetNodeState returns a copy of a node state and the node graph definition

func (*StateEngine) GetState

func (e *StateEngine) GetState() storage.ProcessState

GetState returns the current state (which is expected to be consistent)

func (*StateEngine) HitInput

func (e *StateEngine) HitInput(
	node storage.ProcessNode,
	input string,
	complete bool,
	closed bool,
	timestamp time.Time,
)

HitInput signal a data emission to an input

func (*StateEngine) HitOutput

func (e *StateEngine) HitOutput(
	node storage.ProcessNode,
	output string,
	status storage.ReceptionStatus,
	timestamp time.Time,
)

HitOutput signal a data reception from an output

func (*StateEngine) SetNodeState

func (e *StateEngine) SetNodeState(node storage.ProcessNode, state storage.ProcessNodeState)

SetNodeState updates a node state

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL