Version: v0.0.0-...-821bb50 Latest Latest

This package is not in the latest version of its module.

Go to latest
Published: Jun 29, 2021 License: MIT Imports: 8 Imported by: 0




This section is empty.


View Source
var (
	// ErrInvalidPipelineStatus is returned when the pipeline status does not
	// allow the operation
	ErrInvalidPipelineStatus = errors.New("Invalid Pipeline Status")
View Source
var (
	// ErrNoSuchID is returned when a non-existing id is passed to some function
	ErrNoSuchID = errors.New("No such ID")


func EncodePosition

func EncodePosition(id api.UUID, pos EnvelopeReadPos) *api.EnvelopePosition

EncodePosition translate a storage.EnvelopeReadPos into a api.EnvelopePosition

func NoOpProcessEventStatusHandler

func NoOpProcessEventStatusHandler(Process, api.Process_Status, api.Process_Status)

NoOpProcessEventStatusHandler is a noop handler

func NoopActorEventHandler

func NoopActorEventHandler(api.Actor)

NoopActorEventHandler is an ActorEventHandler that does nothing

func VoidProcessLogEntryHandler

func VoidProcessLogEntryHandler([]ProcessLogEntry)

VoidProcessLogEntryHandler is a noop ProcessLogEntryHandler


type AccountStorage

type AccountStorage interface {
	Get(id api.UUID) (api.Account, bool)
	GetByName(name string) (api.Account, bool)
	GetByCert(cert string) (api.Account, bool)
	GetByAPIKey(key string) (api.Account, bool)

	Create(api.Account) (api.Account, error)
	Update(api.Account) (api.Account, error)

	List() []api.Account

AccountStorage is in charge of storing account definitions. When multiple instances are running, CRUD operations be reflected from one to the others

type ActorEventHandler

type ActorEventHandler func(api.Actor)

ActorEventHandler is the type for callbacks

type ActorStorage

type ActorStorage interface {
	Get(api.UUID) (api.Actor, bool)
	GetByName(name string) (api.Actor, bool)
	List() []api.Actor
	ListAccount(accountID api.UUID) []api.Actor
	ListKind(actorKind api.Actor_Kind) []api.Actor
	ListRole(actorKind api.Actor_Kind, role string) []api.Actor

	Create(api.Actor) (api.Actor, error)
	Update(api.Actor) (api.Actor, error)

	MarkUnresponsive(id api.UUID, decisionTime time.Time) error
	JustSeen(api.UUID) error
	SetOnline(api.UUID, bool) error

	SetEventHandlers(onActorUp, onActorDown ActorEventHandler)

ActorStorage is in charge of storing the actor definitions When multiple instances are running, CRUD operations be reflected from one to the others

type Datatype

type Datatype string
const (
	DatatypeEnvelope Datatype = "envelope"

func (Datatype) String

func (d Datatype) String() string

type Edge

type Edge struct {
	From NodeOutputRef
	To   NodeInputRef

Edge is a Graph oriented edge

func EdgeFromString

func EdgeFromString(str string) (Edge, error)

EdgeFromString unserialize a Edge

func MustEdgeFromString

func MustEdgeFromString(str string) Edge

MustEdgeFromString unserialize a Edge and panics if it fails

func (Edge) MarshalYAML

func (e Edge) MarshalYAML() (interface{}, error)

MarshalYAML marshals Edge to string

func (Edge) String

func (e Edge) String() string

String() serialize a Edge

func (*Edge) UnmarshalYAML

func (e *Edge) UnmarshalYAML(unmarshal func(interface{}) error) error

UnmarshalYAML unmarshals Edge from string

type Emission

type Emission struct {
	ProcessID      api.UUID
	EnvelopeID     api.UUID
	TargetActor    api.UUID
	TargetInputRef NodeInputRef

Emission identifies the emission of an envelope to a target

type EnvelopeFilter

type EnvelopeFilter func([]api.UUID) []api.UUID

EnvelopeFilter filters a batch of envelope ids at once

type EnvelopeReadPos

type EnvelopeReadPos struct {
	Events   map[api.UUID]EventReadPos
	Start    bool
	Complete bool

EnvelopeReadPos contains the current reading position of each event of an envelope It can be used to retrieve the next part of an envelope later

func DecodePosition

func DecodePosition(apiPos api.EnvelopePosition) EnvelopeReadPos

DecodePosition translate a api.EnvelopePosition into a storage.EnvelopeReadPos

func EnvelopeStartPos

func EnvelopeStartPos() EnvelopeReadPos

EnvelopeStartPos returns a new EnvelopeReadPos pointing the beginning of the envelope

func (EnvelopeReadPos) Clone

func (p EnvelopeReadPos) Clone() EnvelopeReadPos

Clone duplicate the position

func (EnvelopeReadPos) Equals

func (p EnvelopeReadPos) Equals(other EnvelopeReadPos) bool

Equals returns true if postions are equals

type EnvelopeState

type EnvelopeState struct {
	ID         api.UUID
	TypesKnown bool // true if the type of all the events is known
	Status     EnvelopeStorageReceptionStatus
	Err        error

	Events []EventState

EnvelopeState contains the detailled status of an envelope, allowing the broker to make decisions

func (EnvelopeState) Error

func (s EnvelopeState) Error() string

Error implements the error interface in case State is "error"

func (EnvelopeState) EventTypes

func (s EnvelopeState) EventTypes() []string

EventTypes returns the event types or nil if not fully known yet

type EnvelopeStorage

type EnvelopeStorage interface {
	// Set the event handler. Any current handler gets overridden.
	// nil can be passed to remove the current handler.

	// Store part or all fo an envelope
	// The returned EnvelopeState reflect the final state of the complete
	// envelope, once the given part is merge to it.
	// Concurrency & ordering:
	// The envelope parts can be passed in any order, from different
	// goroutines, and to different instance of the Storage in different
	// processes all pointed to the same backend.
	// Error handling:
	// It any error occur, including validity errors (which are reported
	// within the EnvelopeState), the envelope should be marked as 'error'
	// in the storage, and no other part of the envelope should be accepted.
	StoreEnvelope(api.Envelope) (EnvelopeState, error)

	// CheckStalled mark the given envelope as stalled if no fragment was
	// received in the given time _and_ the envelope is in state "Receiving"
	// Returns true if the envelope is indeed stalled
	// If the envelope is already marked stalled, the given time is ignored.
	CheckStalled(api.UUID, time.Duration) bool

	// GetEventTypes returns the eventtypes of an envelope, or nil if unknown
	// exists is false if the envelope is unknown, true if exists.
	// should panic on any internal error.
	GetEventTypes(id api.UUID) (types []string, exists bool)

	// ReceptionStatus returns the envelope reception status
	// Must returns a (possibly nested) ErrNoSuchID if the envelope does not exist
	ReceptionStatus(id api.UUID) (EnvelopeStorageReceptionStatus, error)

	// Must returns a (possibly nested) ErrNoSuchID if the envelope does not exist
	ReadEnvelope(id api.UUID, position EnvelopeReadPos, maxsize int) (api.Envelope, EnvelopeReadPos, error)


	Stat() ([]control.StorageStat_Entry, error)

EnvelopeStorage can store partial envelopes, track the completions of the events inside it, serve back the data in chunks

type EnvelopeStorageEventHandler

type EnvelopeStorageEventHandler func(
	envelopeID api.UUID, status EnvelopeStorageReceptionStatus, newData bool)

EnvelopeStorageEventHandler is a handler for EnvelopeStorage events. newData is true is more data is now available on the envelope

type EnvelopeStorageReceptionStatus

type EnvelopeStorageReceptionStatus int

EnvelopeStorageReceptionStatus is the reception status of an envelope or an event

const (
	// EnvelopeStorageReceptionUnknown is when the reception status is unknwown
	EnvelopeStorageReceptionUnknown EnvelopeStorageReceptionStatus = iota

	// EnvelopeStorageReceptionReceiving is when the reception is not finished and
	// a piece of data was received not long ago

	// EnvelopeStorageReceptionComplete is when the envelope or event is fully arrived

	// EnvelopeStorageReceptionError is when an error occured

	// EnvelopeStorageReceptionStalled is when an envelope misses some fragment
	// and nothing comes in for a given time and some storage client is waiting

func (EnvelopeStorageReceptionStatus) MarshalText

func (i EnvelopeStorageReceptionStatus) MarshalText() ([]byte, error)

func (EnvelopeStorageReceptionStatus) String

func (*EnvelopeStorageReceptionStatus) UnmarshalText

func (i *EnvelopeStorageReceptionStatus) UnmarshalText(text []byte) error

type EventReadPos

type EventReadPos struct {
	Index     uint64
	ItemCount uint64

EventReadPos containts the current reading position of an event

type EventState

type EventState struct {
	ID             api.UUID
	Type           string
	AvailableItems uint64 // number of items that can be read in a sequence
	TotalItems     uint64 // total number of items or 0 if unknown
	Status         EnvelopeStorageReceptionStatus
	Err            error

	ReadPos EventReadPos

EventState contains the detailled reception state of an event

func (EventState) Error

func (s EventState) Error() string

Error implements the error interface in case Status is "error"

type Graph

type Graph struct {
	Nodes []Node
	Edges []Edge

Graph stores the graph definition of a pipeline

type Job

type Job struct {

Job is a director job definition and state

func AllJobs

func AllJobs(list JobList) ([]Job, error)

AllJobs returns a complete list and close the given JobList

type JobID

type JobID struct {
	ProcessID api.UUID
	Node      ProcessNode

JobID A job unique id.

func (JobID) String

func (id JobID) String() string

type JobList

type JobList interface {
	Next() bool
	Get() (Job, error)

JobList is for returning long job lists in a stream

type JobState

type JobState struct {
	CreatedAt   time.Time
	Detached    bool
	Status      api.ActorProcessingState_Status
	EnvelopeIDs []api.UUID

	FirstAttempt time.Time
	AttemptCount uint

JobState contains all the job state

type JobStorage

type JobStorage interface {
	NewJob(JobID, JobState)
	SetState(JobID, JobState)
	GetState(JobID) JobState

	ListPendingJobByActor(actor api.UUID, after *JobID, limit int) []Job
	ListJobByProcess(api.UUID) []Job
	ListRunningJobByEnvelopeID(api.UUID) []Job
	ListDetachedJobByActor(actorID api.UUID) (JobList, error)

	Purge([]api.UUID) error

JobStorage persists the jobs

type LogStorage

type LogStorage interface {
	GetProcessMessages(processID api.UUID, level api.LogLevel) []api.LogEntry
	PurgeBefore(time.Time) int64
	Stat() (*control.StorageStat_Entry, error)

A LogStorage stores log entries and can return them given some filters

type NoSuchIDError

type NoSuchIDError struct {
	// contains filtered or unexported fields

func NewNoSuchIDError

func NewNoSuchIDError(datatype Datatype, id api.UUID) *NoSuchIDError

func (NoSuchIDError) Error

func (err NoSuchIDError) Error() string

func (NoSuchIDError) Unwrap

func (err NoSuchIDError) Unwrap() error

type Node

type Node struct {
	ID            string
	Type          NodeType
	ActorIDs      []api.UUID // Explicit actors. Deprecated (will be removed in a later version)
	Actors        []string   // Explicit actors, by name or UUID
	Roles         []string   // Actor roles
	RoleBroadcast bool       // If true, all actors matching each role will receive the message, instead of only one actor per role

	SourceMatch *SourceMatch // define only if Type==EmitterNode

	Inputs  []string
	Outputs []string

Node is a graph vertex

type NodeInputRef

type NodeInputRef struct {
	NodeID string
	Input  string

NodeInputRef is a reference to a node input

func NodeInputRefFromString

func NodeInputRefFromString(str string) (ref NodeInputRef, err error)

NodeInputRefFromString loads a NodeInputRef

func (NodeInputRef) MarshalYAML

func (r NodeInputRef) MarshalYAML() (interface{}, error)

MarshalYAML marshals NodeInputRef to string

func (NodeInputRef) String

func (r NodeInputRef) String() string

String serialize a NodeInputRef

func (*NodeInputRef) UnmarshalYAML

func (r *NodeInputRef) UnmarshalYAML(unmarshal func(interface{}) error) error

UnmarshalYAML unmarshals NodeInputRef from string

type NodeOutputRef

type NodeOutputRef struct {
	NodeID string
	Output string

NodeOutputRef is a reference to a node input

func NodeOutputRefFromString

func NodeOutputRefFromString(str string) (ref NodeOutputRef, err error)

NodeOutputRefFromString loads a NodeInputRef

func (NodeOutputRef) MarshalYAML

func (r NodeOutputRef) MarshalYAML() (interface{}, error)

MarshalYAML marshals NodeOutputRef to string

func (NodeOutputRef) String

func (r NodeOutputRef) String() string

String serialize a NodeOutputRef

func (*NodeOutputRef) UnmarshalYAML

func (r *NodeOutputRef) UnmarshalYAML(unmarshal func(interface{}) error) error

UnmarshalYAML unmarshals NodeOutputRef from string

type NodeType

type NodeType int

NodeType is the type of Node

const (
	// EmitterNode is the type of emitter nodes
	EmitterNode NodeType = iota
	// ConsumerNode is the type of consumer nodes
	// WorkerNode is the type of worker nodes
	// RequestorNode is an emitter that waits for a reply

func (NodeType) MarshalText

func (i NodeType) MarshalText() ([]byte, error)

func (NodeType) String

func (i NodeType) String() string

func (*NodeType) UnmarshalText

func (i *NodeType) UnmarshalText(text []byte) error

type PasswordStorage

type PasswordStorage interface {
	Set(id api.UUID, password string)
	Reset(id api.UUID)
	Verify(id api.UUID, password string) bool

PasswordStorage stores passwords in a safe manner

type Pipeline

type Pipeline struct {
	Graph Graph

Pipeline defines how an envelope should be processed

type PipelineStorage

type PipelineStorage interface {
	Load(api.UUID) (Pipeline, bool)
	Save(Pipeline) (api.UUID, error)
	SetStatus(api.UUID, api.PipelineInfo_Status) error
	LoadActivePipeline() []Pipeline
	Query(name, version string, activeOnly bool) []Pipeline

PipelineStorage handle Pipeline persistence

type PostMortemState

type PostMortemState struct {
	Level   api.LogLevel
	Status  api.PMProcess_Status
	Comment string
	Changed time.Time

PostMortemState describes the post-mortem state of a Process

type Process

type Process struct {
	ID                api.UUID
	GroupID           api.UUID
	TriggerEmitterID  api.UUID
	TriggerEnvelopeID api.UUID
	CreatedAt         time.Time
	ReplayOf          api.UUID
	PipelineID        api.UUID
	SourceOutput      NodeOutputRef

Process contains the data necessary for process persistence

type ProcessEventStatusHandler

type ProcessEventStatusHandler func(p Process, old api.Process_Status, new api.Process_Status)

ProcessEventStatusHandler is the type of function that handle status changes

type ProcessLogEntry

type ProcessLogEntry struct {
	ID               string
	ProcessID        api.UUID
	Timestamp        time.Time
	Emission         *Emission        `json:",omitempty"`
	EmissionPosition *EnvelopeReadPos `json:",omitempty"`
	Reception        *Reception       `json:",omitempty"`
	ReceptionStatus  ReceptionStatus  `json:",omitempty"`
	ProcessNodeEnd   *ProcessNodeEnd  `json:",omitempty"`
	ResultAck        bool             `json:",omitempty"`

ProcessLogEntry is the unique source of information for process tracking.

type ProcessLogEntryHandler

type ProcessLogEntryHandler func(entries []ProcessLogEntry)

ProcessLogEntryHandler is the type of functions that handle process log entries

type ProcessNode

type ProcessNode struct {
	ActorID api.UUID
	NodeID  string

A ProcessNode is a live pipeline graph node. One graph node can correspond to several ProcessNode (especially consumer nodes)

func ProcessNodeFromString

func ProcessNodeFromString(s string) (ProcessNode, error)

ProcessNodeFromString deserialize a ProcessNode

func (ProcessNode) MarshalText

func (n ProcessNode) MarshalText() ([]byte, error)

MarshalText implements encoding.TextMarshaler for ProcessNode

func (ProcessNode) String

func (n ProcessNode) String() string

String serialize a ProcessNode

func (*ProcessNode) UnmarshalText

func (n *ProcessNode) UnmarshalText(text []byte) (err error)

UnmarshalText implements encoding.TextUnmarshaler for ProcessNode

type ProcessNodeEnd

type ProcessNodeEnd struct {
	Node   ProcessNode
	Status api.ActorProcessingState_Status

ProcessNodeEnd signals the end of processing by a process node

type ProcessNodeIOStatus

type ProcessNodeIOStatus int

ProcessNodeIOStatus reflects the status of the inputs or outputs of a node

const (
	// ProcessNodeIOPending is when no io started yet
	ProcessNodeIOPending ProcessNodeIOStatus = iota
	// ProcessNodeIOStarted is when one or more io started emitting
	// ProcessNodeIOAllStarted is when all io started emitting
	// ProcessNodeIOClosed is when all the io is closed without any transmission
	// ProcessNodeIODone is when all io finished emitting
	// ProcessNodeIOError is when an error occured on one of the io

func (ProcessNodeIOStatus) MarshalText

func (i ProcessNodeIOStatus) MarshalText() ([]byte, error)

func (ProcessNodeIOStatus) String

func (i ProcessNodeIOStatus) String() string

func (*ProcessNodeIOStatus) UnmarshalText

func (i *ProcessNodeIOStatus) UnmarshalText(text []byte) error

type ProcessNodeIOStatusMap

type ProcessNodeIOStatusMap map[string]ProcessNodeIOStatus

ProcessNodeIOStatusMap is a map of emission or reception statuses

type ProcessNodeInput

type ProcessNodeInput struct {
	Name       string
	EnvelopeID api.UUID

ProcessNodeInput describes which envelope does on a given input

type ProcessNodeState

type ProcessNodeState struct {
	Status         ProcessNodeStatus
	Error          string
	Input          ProcessNodeIOStatus
	InputStatuses  ProcessNodeIOStatusMap
	Output         ProcessNodeIOStatus
	OutputStatuses ProcessNodeIOStatusMap

ProcessNodeState reflect the current state of a process node.

type ProcessNodeStateMap

type ProcessNodeStateMap map[ProcessNode]ProcessNodeState

ProcessNodeStateMap contains states of process nodes

func (ProcessNodeStateMap) MarshalYAML

func (m ProcessNodeStateMap) MarshalYAML() (interface{}, error)

MarshalYAML as an ordered dict

type ProcessNodeStatus

type ProcessNodeStatus int

ProcessNodeStatus is the process node status

const (
	// ProcessNodeStatusPending is before node activation
	ProcessNodeStatusPending ProcessNodeStatus = iota
	// ProcessNodeActive is when any input or output is active
	// ProcessNodeDone is when all input/output are done transmitting
	// ProcessNodeUnreachable is when every path leading to the node
	// are closed and the node will never be active
	// ProcessNodeError is when an error occured on the node

func (ProcessNodeStatus) MarshalText

func (i ProcessNodeStatus) MarshalText() ([]byte, error)

func (ProcessNodeStatus) String

func (i ProcessNodeStatus) String() string

func (*ProcessNodeStatus) UnmarshalText

func (i *ProcessNodeStatus) UnmarshalText(text []byte) error

type ProcessState

type ProcessState struct {
	Status            api.Process_Status
	StatusChanged     time.Time
	StatusReason      string
	NodeStates        ProcessNodeStateMap
	NodeStatesChanged time.Time

	ResponseEnvelopeID api.UUID
	ResultAcked        bool

ProcessState reflect the current state of a process execution. It is completely updated by applying incoming logentries

func (ProcessState) Fmt

func (s ProcessState) Fmt() string

Fmt returns a formatted string of the state

type ProcessStorage

type ProcessStorage interface {
	GetProcess(api.UUID) (Process, bool)
	GetProcessByTrigger(emitter api.Actor, envelopeID api.UUID) (Process, bool)

	Query(filter api.ProcessFilter) []Process
	QuerySummary(filter api.ProcessFilter) (ProcessSummary, error)

	FindProcessForEnvelope(envelopeIDs []api.UUID) map[api.UUID]api.UUID

	CreateProcess(Process) (api.UUID, error)
	SetPipeline(processID api.UUID, pipelineID api.UUID, sourceOutput NodeOutputRef)

		processID api.UUID,
		source NodeOutputRef,
		envelopeID api.UUID,
		noRouteTableUpdate bool,
		calcTargets func() ([]api.EnvelopeTarget, bool, error),
	) ([]api.EnvelopeTarget, error)

		processID api.UUID,
		targetNode ProcessNode,
	) []ProcessNodeInput

	UpdateReceiveStatus(reception Reception, status ReceptionStatus) error

		process api.UUID, node ProcessNode,
		status api.ActorProcessingState_Status) error

	GetState(api.UUID) ProcessState
	GetStateUpdater(api.UUID) StateUpdater
	SetStatus(api.UUID, api.Process_Status, string) error

	// Generate a AckResult process log entry

	SetEventHandlers(onStatusChanged ProcessEventStatusHandler)

	GetUnprocessedProcessLogEntries() []ProcessLogEntry
	GetProcessLogEntries(api.UUID) []ProcessLogEntry

	QueryPostMortem(level api.LogLevel, status ...api.PMProcess_Status) []Process
	GetPostMortemState(api.UUID) PostMortemState
	SetPostMortemState(api.UUID, PostMortemState)

	// PurgeProcess removes all data of some processes.
	// The process must be terminated
	// Returns the list of process purged. If some ids are missing, the process
	// were non-existant or they had a non terminal status
	PurgeProcess([]api.UUID) ([]api.UUID, error)

ProcessStorage persists process data

type ProcessSummary

type ProcessSummary []ProcessSummaryEntry

ProcessSummary is a set of statistics about the process

type ProcessSummaryEntry

type ProcessSummaryEntry struct {
	PipelineID  api.UUID
	EmitterID   api.UUID
	Status      api.Process_Status
	ResultAcked bool

	Count uint64

ProcessSummaryEntry ...

type Reception

type Reception struct {
	ProcessID       api.UUID
	EnvelopeID      api.UUID
	SourceActor     api.UUID
	SourceOutputRef NodeOutputRef

Reception identifies a reception of an envelop from an actor

type ReceptionStatus

type ReceptionStatus int

ReceptionStatus describes the status of an envelope reception in the context of a process

const (
	// ReceptionPending is when envelop reception is not yet started
	ReceptionPending ReceptionStatus = iota
	// ReceptionOngoing is when part(s) of the envelope were received, but not all
	// ReceptionDone is when all the complete envelop has been receiced
	// ReceptionError is when a reception error occured
	// ReceptionClosed is when no envelope is to be expected

func (ReceptionStatus) MarshalText

func (s ReceptionStatus) MarshalText() ([]byte, error)

MarshalText serialize a ReceptionStatus to text

func (ReceptionStatus) String

func (s ReceptionStatus) String() string

func (*ReceptionStatus) UnmarshalText

func (s *ReceptionStatus) UnmarshalText(data []byte) error

UnmarshalText parses a ReceptionStatus

type Session

type Session struct {
	AccountID api.UUID

Session links a SessionToken and a Account

type SessionStorage

type SessionStorage interface {
	List() ([]Session, error)
	Get(token string) (*Session, error)
	Set(Session) error
	Delete(token string) error
	DeleteOlderThan(time time.Time) error

SessionStorage stores Sessions

type SourceMatch

type SourceMatch struct {
	EventTypes []string // All event types envelope must contain exactly

SourceMatch are rules to match incoming envelope with

type StateUpdater

type StateUpdater interface {
	Get() (ProcessState, error)
	Set(ProcessState) error
	MarkLogEntry(id string, asError bool) error
	Unlock() error

StateUpdater provides exclusive access to a process state


Path Synopsis

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL