Documentation
¶
Index ¶
- Variables
- func EncodePosition(id api.UUID, pos EnvelopeReadPos) *api.EnvelopePosition
- func NoOpProcessEventStatusHandler(Process, api.Process_Status, api.Process_Status)
- func NoopActorEventHandler(api.Actor)
- func VoidProcessLogEntryHandler([]ProcessLogEntry)
- type AccountStorage
- type ActorEventHandler
- type ActorStorage
- type Edge
- type Emission
- type EnvelopeFilter
- type EnvelopeReadPos
- type EnvelopeState
- type EnvelopeStorage
- type EnvelopeStorageEventHandler
- type EnvelopeStorageReceptionStatus
- type EventReadPos
- type EventState
- type Graph
- type Job
- type JobID
- type JobList
- type JobState
- type JobStorage
- type LogStorage
- type Node
- type NodeInputRef
- type NodeOutputRef
- type NodeType
- type PasswordStorage
- type Pipeline
- type PipelineStorage
- type PostMortemState
- type Process
- type ProcessEventStatusHandler
- type ProcessLogEntry
- type ProcessLogEntryHandler
- type ProcessNode
- type ProcessNodeEnd
- type ProcessNodeIOStatus
- type ProcessNodeIOStatusMap
- type ProcessNodeInput
- type ProcessNodeState
- type ProcessNodeStateMap
- type ProcessNodeStatus
- type ProcessState
- type ProcessStorage
- type Reception
- type ReceptionStatus
- type Session
- type SessionStorage
- type SourceMatch
- type StateUpdater
Constants ¶
This section is empty.
Variables ¶
var ( // ErrInvalidPipelineStatus is returned when the pipeline status does not // allow the operation ErrInvalidPipelineStatus = errors.New("Invalid Pipeline Status") )
var ( // ErrNoSuchID is returned when a non-existing id is passed to some function ErrNoSuchID = errors.New("No such ID") )
Functions ¶
func EncodePosition ¶
func EncodePosition(id api.UUID, pos EnvelopeReadPos) *api.EnvelopePosition
EncodePosition translate a storage.EnvelopeReadPos into a api.EnvelopePosition
func NoOpProcessEventStatusHandler ¶
func NoOpProcessEventStatusHandler(Process, api.Process_Status, api.Process_Status)
NoOpProcessEventStatusHandler is a noop handler
func NoopActorEventHandler ¶
NoopActorEventHandler is an ActorEventHandler that does nothing
func VoidProcessLogEntryHandler ¶
func VoidProcessLogEntryHandler([]ProcessLogEntry)
VoidProcessLogEntryHandler is a noop ProcessLogEntryHandler
Types ¶
type AccountStorage ¶
type AccountStorage interface {
Get(id api.UUID) (api.Account, bool)
GetByName(name string) (api.Account, bool)
GetByCert(cert string) (api.Account, bool)
GetByAPIKey(key string) (api.Account, bool)
Create(api.Account) (api.Account, error)
Update(api.Account) (api.Account, error)
Delete(api.Account)
List() []api.Account
}
AccountStorage is in charge of storing account definitions. When multiple instances are running, CRUD operations be reflected from one to the others
type ActorEventHandler ¶
ActorEventHandler is the type for callbacks
type ActorStorage ¶
type ActorStorage interface {
Get(api.UUID) (api.Actor, bool)
GetByName(name string) (api.Actor, bool)
List() []api.Actor
ListAccount(accountID api.UUID) []api.Actor
ListKind(actorKind api.Actor_Kind) []api.Actor
ListRole(actorKind api.Actor_Kind, role string) []api.Actor
Create(api.Actor) (api.Actor, error)
Delete(api.UUID)
Update(api.Actor) (api.Actor, error)
MarkUnresponsive(id api.UUID, decisionTime time.Time) error
JustSeen(api.UUID) error
SetOnline(api.UUID, bool) error
SetEventHandlers(onActorUp, onActorDown ActorEventHandler)
}
ActorStorage is in charge of storing the actor definitions When multiple instances are running, CRUD operations be reflected from one to the others
type Edge ¶
type Edge struct {
From NodeOutputRef
To NodeInputRef
}
Edge is a Graph oriented edge
func EdgeFromString ¶
EdgeFromString unserialize a Edge
func (Edge) MarshalYAML ¶
MarshalYAML marshals Edge to string
func (*Edge) UnmarshalYAML ¶
UnmarshalYAML unmarshals Edge from string
type Emission ¶
type Emission struct {
ProcessID api.UUID
EnvelopeID api.UUID
TargetActor api.UUID
TargetInputRef NodeInputRef
}
Emission identifies the emission of an envelope to a target
type EnvelopeFilter ¶
EnvelopeFilter filters a batch of envelope ids at once
type EnvelopeReadPos ¶
type EnvelopeReadPos struct {
Events map[api.UUID]EventReadPos
Start bool
Complete bool
}
EnvelopeReadPos contains the current reading position of each event of an envelope It can be used to retrieve the next part of an envelope later
func DecodePosition ¶
func DecodePosition(apiPos api.EnvelopePosition) EnvelopeReadPos
DecodePosition translate a api.EnvelopePosition into a storage.EnvelopeReadPos
func EnvelopeStartPos ¶
func EnvelopeStartPos() EnvelopeReadPos
EnvelopeStartPos returns a new EnvelopeReadPos pointing the beginning of the envelope
func (EnvelopeReadPos) Clone ¶
func (p EnvelopeReadPos) Clone() EnvelopeReadPos
Clone duplicate the position
func (EnvelopeReadPos) Equals ¶
func (p EnvelopeReadPos) Equals(other EnvelopeReadPos) bool
Equals returns true if postions are equals
type EnvelopeState ¶
type EnvelopeState struct {
ID api.UUID
TypesKnown bool // true if the type of all the events is known
Status EnvelopeStorageReceptionStatus
Err error
Events []EventState
}
EnvelopeState contains the detailled status of an envelope, allowing the broker to make decisions
func (EnvelopeState) Error ¶
func (s EnvelopeState) Error() string
Error implements the error interface in case State is "error"
func (EnvelopeState) EventTypes ¶
func (s EnvelopeState) EventTypes() []string
EventTypes returns the event types or nil if not fully known yet
type EnvelopeStorage ¶
type EnvelopeStorage interface {
// Set the event handler. Any current handler gets overridden.
// nil can be passed to remove the current handler.
SetEventHandler(EnvelopeStorageEventHandler)
// Store part or all fo an envelope
// The returned EnvelopeState reflect the final state of the complete
// envelope, once the given part is merge to it.
//
// Concurrency & ordering:
// The envelope parts can be passed in any order, from different
// goroutines, and to different instance of the Storage in different
// processes all pointed to the same backend.
//
// Error handling:
// It any error occur, including validity errors (which are reported
// within the EnvelopeState), the envelope should be marked as 'error'
// in the storage, and no other part of the envelope should be accepted.
StoreEnvelope(api.Envelope) (EnvelopeState, error)
// CheckStalled mark the given envelope as stalled if no fragment was
// received in the given time _and_ the envelope is in state "Receiving"
// Returns true if the envelope is indeed stalled
// If the envelope is already marked stalled, the given time is ignored.
CheckStalled(api.UUID, time.Duration) bool
// GetEventTypes returns the eventtypes of an envelope, or nil if unknown
// exists is false if the envelope is unknown, true if exists.
// should panic on any internal error.
GetEventTypes(id api.UUID) (types []string, exists bool)
ReadEnvelope(id api.UUID, position EnvelopeReadPos, maxsize int) (api.Envelope, EnvelopeReadPos, error)
Purge(EnvelopeFilter)
}
EnvelopeStorage can store partial envelopes, track the completions of the events inside it, serve back the data in chunks
type EnvelopeStorageEventHandler ¶
type EnvelopeStorageEventHandler func( envelopeID api.UUID, status EnvelopeStorageReceptionStatus, newData bool)
EnvelopeStorageEventHandler is a handler for EnvelopeStorage events. newData is true is more data is now available on the envelope
type EnvelopeStorageReceptionStatus ¶
type EnvelopeStorageReceptionStatus int
EnvelopeStorageReceptionStatus is the reception status of an envelope or an event
const ( // EnvelopeStorageReceptionUnknown is when the reception status is unknwown EnvelopeStorageReceptionUnknown EnvelopeStorageReceptionStatus = iota // EnvelopeStorageReceptionReceiving is when the reception is not finished and // a piece of data was received not long ago EnvelopeStorageReceptionReceiving // EnvelopeStorageReceptionComplete is when the envelope or event is fully arrived EnvelopeStorageReceptionComplete // EnvelopeStorageReceptionError is when an error occured EnvelopeStorageReceptionError // EnvelopeStorageReceptionStalled is when an envelope misses some fragment // and nothing comes in for a given time and some storage client is waiting EnvelopeStorageReceptionStalled )
func (EnvelopeStorageReceptionStatus) MarshalText ¶
func (i EnvelopeStorageReceptionStatus) MarshalText() ([]byte, error)
func (EnvelopeStorageReceptionStatus) String ¶
func (i EnvelopeStorageReceptionStatus) String() string
func (*EnvelopeStorageReceptionStatus) UnmarshalText ¶
func (i *EnvelopeStorageReceptionStatus) UnmarshalText(text []byte) error
type EventReadPos ¶
EventReadPos containts the current reading position of an event
type EventState ¶
type EventState struct {
ID api.UUID
Type string
AvailableItems uint64 // number of items that can be read in a sequence
TotalItems uint64 // total number of items or 0 if unknown
Status EnvelopeStorageReceptionStatus
Err error
}
EventState contains the detailled reception state of an event
func (EventState) Error ¶
func (s EventState) Error() string
Error implements the error interface in case Status is "error"
type JobState ¶
type JobState struct {
Detached bool
Status api.ActorProcessingState_Status
EnvelopeIDs []api.UUID
}
JobState contains all the job state
type JobStorage ¶
type JobStorage interface {
NewJob(JobID, JobState)
SetState(JobID, JobState)
GetState(JobID) JobState
ListPendingJobByActor(actor api.UUID, after *JobID, limit int) []Job
ListJobByProcess(api.UUID) []Job
ListRunningJobByEnvelopeID(api.UUID) []Job
ListDetachedJobByActor(actorID api.UUID) (JobList, error)
Purge([]api.UUID) error
}
JobStorage persists the jobs
type LogStorage ¶
type LogStorage interface {
Log(api.LogEntry)
GetProcessMessages(processID api.UUID, level api.LogLevel) []api.LogEntry
PurgeBefore(time.Time) int64
}
A LogStorage stores log entries and can return them given some filters
type Node ¶
type Node struct {
ID string
Type NodeType
ActorIDs []api.UUID // Explicit actors. Deprecated (will be removed in a later version)
Actors []string // Explicit actors, by name or UUID
Roles []string // Actor roles
RoleBroadcast bool // If true, all actors matching each role will receive the message, instead of only one actor per role
SourceMatch *SourceMatch // define only if Type==EmitterNode
Inputs []string
Outputs []string
}
Node is a graph vertex
type NodeInputRef ¶
NodeInputRef is a reference to a node input
func NodeInputRefFromString ¶
func NodeInputRefFromString(str string) (ref NodeInputRef, err error)
NodeInputRefFromString loads a NodeInputRef
func (NodeInputRef) MarshalYAML ¶
func (r NodeInputRef) MarshalYAML() (interface{}, error)
MarshalYAML marshals NodeInputRef to string
func (*NodeInputRef) UnmarshalYAML ¶
func (r *NodeInputRef) UnmarshalYAML(unmarshal func(interface{}) error) error
UnmarshalYAML unmarshals NodeInputRef from string
type NodeOutputRef ¶
NodeOutputRef is a reference to a node input
func NodeOutputRefFromString ¶
func NodeOutputRefFromString(str string) (ref NodeOutputRef, err error)
NodeOutputRefFromString loads a NodeInputRef
func (NodeOutputRef) MarshalYAML ¶
func (r NodeOutputRef) MarshalYAML() (interface{}, error)
MarshalYAML marshals NodeOutputRef to string
func (NodeOutputRef) String ¶
func (r NodeOutputRef) String() string
String serialize a NodeOutputRef
func (*NodeOutputRef) UnmarshalYAML ¶
func (r *NodeOutputRef) UnmarshalYAML(unmarshal func(interface{}) error) error
UnmarshalYAML unmarshals NodeOutputRef from string
type NodeType ¶
type NodeType int
NodeType is the type of Node
func (NodeType) MarshalText ¶
func (*NodeType) UnmarshalText ¶
type PasswordStorage ¶
type PasswordStorage interface {
Set(id api.UUID, password string)
Reset(id api.UUID)
Verify(id api.UUID, password string) bool
}
PasswordStorage stores passwords in a safe manner
type Pipeline ¶
type Pipeline struct {
api.PipelineInfo
Graph Graph
}
Pipeline defines how an envelope should be processed
type PipelineStorage ¶
type PipelineStorage interface {
Load(api.UUID) (Pipeline, bool)
Save(Pipeline) (api.UUID, error)
SetStatus(api.UUID, api.PipelineInfo_Status) error
LoadActivePipeline() []Pipeline
Query(name, version string, activeOnly bool) []Pipeline
}
PipelineStorage handle Pipeline persistence
type PostMortemState ¶
type PostMortemState struct {
Level api.LogLevel
Status api.PMProcess_Status
Comment string
Changed time.Time
}
PostMortemState describes the post-mortem state of a Process
type Process ¶
type Process struct {
ID api.UUID
TriggerEmitterID api.UUID
TriggerEnvelopeID api.UUID
CreatedAt time.Time
PipelineID api.UUID
SourceOutput NodeOutputRef
}
Process contains the data necessary for process persistence
type ProcessEventStatusHandler ¶
type ProcessEventStatusHandler func(p Process, old api.Process_Status, new api.Process_Status)
ProcessEventStatusHandler is the type of function that handle status changes
type ProcessLogEntry ¶
type ProcessLogEntry struct {
ID string
ProcessID api.UUID
Timestamp time.Time
Emission *Emission `json:",omitempty"`
EmissionPosition *EnvelopeReadPos `json:",omitempty"`
Reception *Reception `json:",omitempty"`
ReceptionStatus ReceptionStatus `json:",omitempty"`
ProcessNodeEnd *ProcessNodeEnd `json:",omitempty"`
ResultAck bool `json:",omitempty"`
}
ProcessLogEntry is the unique source of information for process tracking.
type ProcessLogEntryHandler ¶
type ProcessLogEntryHandler func(entries []ProcessLogEntry)
ProcessLogEntryHandler is the type of functions that handle process log entries
type ProcessNode ¶
A ProcessNode is a live pipeline graph node. One graph node can correspond to several ProcessNode (especially consumer nodes)
func ProcessNodeFromString ¶
func ProcessNodeFromString(s string) (ProcessNode, error)
ProcessNodeFromString deserialize a ProcessNode
func (ProcessNode) MarshalText ¶
func (n ProcessNode) MarshalText() ([]byte, error)
MarshalText implements encoding.TextMarshaler for ProcessNode
func (*ProcessNode) UnmarshalText ¶
func (n *ProcessNode) UnmarshalText(text []byte) (err error)
UnmarshalText implements encoding.TextUnmarshaler for ProcessNode
type ProcessNodeEnd ¶
type ProcessNodeEnd struct {
Node ProcessNode
Status api.ActorProcessingState_Status
}
ProcessNodeEnd signals the end of processing by a process node
type ProcessNodeIOStatus ¶
type ProcessNodeIOStatus int
ProcessNodeIOStatus reflects the status of the inputs or outputs of a node
const ( // ProcessNodeIOPending is when no io started yet ProcessNodeIOPending ProcessNodeIOStatus = iota // ProcessNodeIOStarted is when one or more io started emitting ProcessNodeIOStarted // ProcessNodeIOAllStarted is when all io started emitting ProcessNodeIOAllStarted // ProcessNodeIOClosed is when all the io is closed without any transmission ProcessNodeIOClosed // ProcessNodeIODone is when all io finished emitting ProcessNodeIODone // ProcessNodeIOError is when an error occured on one of the io ProcessNodeIOError )
func (ProcessNodeIOStatus) MarshalText ¶
func (i ProcessNodeIOStatus) MarshalText() ([]byte, error)
func (ProcessNodeIOStatus) String ¶
func (i ProcessNodeIOStatus) String() string
func (*ProcessNodeIOStatus) UnmarshalText ¶
func (i *ProcessNodeIOStatus) UnmarshalText(text []byte) error
type ProcessNodeIOStatusMap ¶
type ProcessNodeIOStatusMap map[string]ProcessNodeIOStatus
ProcessNodeIOStatusMap is a map of emission or reception statuses
type ProcessNodeInput ¶
ProcessNodeInput describes which envelope does on a given input
type ProcessNodeState ¶
type ProcessNodeState struct {
Status ProcessNodeStatus
Error string
Input ProcessNodeIOStatus
InputStatuses ProcessNodeIOStatusMap
Output ProcessNodeIOStatus
OutputStatuses ProcessNodeIOStatusMap
}
ProcessNodeState reflect the current state of a process node.
type ProcessNodeStateMap ¶
type ProcessNodeStateMap map[ProcessNode]ProcessNodeState
ProcessNodeStateMap contains states of process nodes
func (ProcessNodeStateMap) MarshalYAML ¶
func (m ProcessNodeStateMap) MarshalYAML() (interface{}, error)
MarshalYAML as an ordered dict
type ProcessNodeStatus ¶
type ProcessNodeStatus int
ProcessNodeStatus is the process node status
const ( // ProcessNodeStatusPending is before node activation ProcessNodeStatusPending ProcessNodeStatus = iota // ProcessNodeActive is when any input or output is active ProcessNodeActive // ProcessNodeDone is when all input/output are done transmitting ProcessNodeDone // ProcessNodeUnreachable is when every path leading to the node // are closed and the node will never be active ProcessNodeUnreachable // ProcessNodeError is when an error occured on the node ProcessNodeError )
func (ProcessNodeStatus) MarshalText ¶
func (i ProcessNodeStatus) MarshalText() ([]byte, error)
func (ProcessNodeStatus) String ¶
func (i ProcessNodeStatus) String() string
func (*ProcessNodeStatus) UnmarshalText ¶
func (i *ProcessNodeStatus) UnmarshalText(text []byte) error
type ProcessState ¶
type ProcessState struct {
Status api.Process_Status
StatusChanged time.Time
StatusReason string
NodeStates ProcessNodeStateMap
NodeStatesChanged time.Time
ResponseEnvelopeID api.UUID
ResultAcked bool
}
ProcessState reflect the current state of a process execution. It is completely updated by applying incoming logentries
func (ProcessState) Fmt ¶
func (s ProcessState) Fmt() string
Fmt returns a formatted string of the state
type ProcessStorage ¶
type ProcessStorage interface {
GetProcess(api.UUID) (Process, bool)
GetProcessByTrigger(emitter api.Actor, envelopeID api.UUID) (Process, bool)
Query(filter api.ProcessFilter) []Process
FindProcessForEnvelope(envelopeIDs []api.UUID) map[api.UUID]api.UUID
CreateProcess(Process) (api.UUID, error)
SetPipeline(processID api.UUID, pipelineID api.UUID, sourceOutput NodeOutputRef)
GetTargets(
processID api.UUID,
source NodeOutputRef,
envelopeID api.UUID,
noRouteTableUpdate bool,
calcTargets func() ([]api.EnvelopeTarget, bool, error),
) ([]api.EnvelopeTarget, error)
GetInputs(
processID api.UUID,
targetNode ProcessNode,
) []ProcessNodeInput
UpdateReceiveStatus(reception Reception, status ReceptionStatus) error
ProcessNodeEnd(
process api.UUID, node ProcessNode,
status api.ActorProcessingState_Status) error
GetState(api.UUID) ProcessState
GetStateUpdater(api.UUID) StateUpdater
SetStatus(api.UUID, api.Process_Status, string) error
// Generate a AckResult process log entry
AckResult(api.UUID)
SetEventHandlers(onStatusChanged ProcessEventStatusHandler)
SetProcessLogEntryHandler(ProcessLogEntryHandler)
GetUnprocessedProcessLogEntries() []ProcessLogEntry
GetProcessLogEntries(api.UUID) []ProcessLogEntry
QueryPostMortem(level api.LogLevel, status ...api.PMProcess_Status) []Process
GetPostMortemState(api.UUID) PostMortemState
SetPostMortemState(api.UUID, PostMortemState)
// PurgeProcess removes all data of some processes.
// The process must be terminated
// Returns the list of process purged. If some ids are missing, the process
// were non-existant or they had a non terminal status
PurgeProcess([]api.UUID) ([]api.UUID, error)
}
ProcessStorage persists process data
type Reception ¶
type Reception struct {
ProcessID api.UUID
EnvelopeID api.UUID
SourceActor api.UUID
SourceOutputRef NodeOutputRef
}
Reception identifies a reception of an envelop from an actor
type ReceptionStatus ¶
type ReceptionStatus int
ReceptionStatus describes the status of an envelope reception in the context of a process
const ( // ReceptionPending is when envelop reception is not yet started ReceptionPending ReceptionStatus = iota // ReceptionOngoing is when part(s) of the envelope were received, but not all ReceptionOngoing // ReceptionDone is when all the complete envelop has been receiced ReceptionDone // ReceptionError is when a reception error occured ReceptionError // ReceptionClosed is when no envelope is to be expected ReceptionClosed )
func (ReceptionStatus) MarshalText ¶
func (s ReceptionStatus) MarshalText() ([]byte, error)
MarshalText serialize a ReceptionStatus to text
func (ReceptionStatus) String ¶
func (s ReceptionStatus) String() string
func (*ReceptionStatus) UnmarshalText ¶
func (s *ReceptionStatus) UnmarshalText(data []byte) error
UnmarshalText parses a ReceptionStatus
type Session ¶
type Session struct {
api.SessionToken
AccountID api.UUID
}
Session links a SessionToken and a Account
type SessionStorage ¶
type SessionStorage interface {
List() ([]Session, error)
Get(token string) (*Session, error)
Set(Session) error
Delete(token string) error
DeleteOlderThan(time time.Time) error
}
SessionStorage stores Sessions
type SourceMatch ¶
type SourceMatch struct {
EventTypes []string // All event types envelope must contain exactly
}
SourceMatch are rules to match incoming envelope with
type StateUpdater ¶
type StateUpdater interface {
Get() (ProcessState, error)
Set(ProcessState) error
MarkLogEntry(id string, asError bool) error
Unlock() error
}
StateUpdater provides exclusive access to a process state