storage

package
v0.0.0-...-e652847 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jan 15, 2020 License: MIT Imports: 7 Imported by: 0

Documentation

Index

Constants

This section is empty.

Variables

View Source
var (
	// ErrInvalidPipelineStatus is returned when the pipeline status does not
	// allow the operation
	ErrInvalidPipelineStatus = errors.New("Invalid Pipeline Status")
)
View Source
var (
	// ErrNoSuchID is returned when a non-existing id is passed to some function
	ErrNoSuchID = errors.New("No such ID")
)

Functions

func EncodePosition

func EncodePosition(id api.UUID, pos EnvelopeReadPos) *api.EnvelopePosition

EncodePosition translate a storage.EnvelopeReadPos into a api.EnvelopePosition

func NoOpProcessEventStatusHandler

func NoOpProcessEventStatusHandler(Process, api.Process_Status, api.Process_Status)

NoOpProcessEventStatusHandler is a noop handler

func NoopActorEventHandler

func NoopActorEventHandler(api.Actor)

NoopActorEventHandler is an ActorEventHandler that does nothing

func VoidProcessLogEntryHandler

func VoidProcessLogEntryHandler([]ProcessLogEntry)

VoidProcessLogEntryHandler is a noop ProcessLogEntryHandler

Types

type AccountStorage

type AccountStorage interface {
	Get(id api.UUID) (api.Account, bool)
	GetByName(name string) (api.Account, bool)
	GetByCert(cert string) (api.Account, bool)
	GetByAPIKey(key string) (api.Account, bool)

	Create(api.Account) (api.Account, error)
	Update(api.Account) (api.Account, error)
	Delete(api.Account)

	List() []api.Account
}

AccountStorage is in charge of storing account definitions. When multiple instances are running, CRUD operations be reflected from one to the others

type ActorEventHandler

type ActorEventHandler func(api.Actor)

ActorEventHandler is the type for callbacks

type ActorStorage

type ActorStorage interface {
	Get(api.UUID) (api.Actor, bool)
	GetByName(name string) (api.Actor, bool)
	List() []api.Actor
	ListAccount(accountID api.UUID) []api.Actor
	ListKind(actorKind api.Actor_Kind) []api.Actor
	ListRole(actorKind api.Actor_Kind, role string) []api.Actor

	Create(api.Actor) (api.Actor, error)
	Delete(api.UUID)
	Update(api.Actor) (api.Actor, error)

	MarkUnresponsive(id api.UUID, decisionTime time.Time) error
	JustSeen(api.UUID) error
	SetOnline(api.UUID, bool) error

	SetEventHandlers(onActorUp, onActorDown ActorEventHandler)
}

ActorStorage is in charge of storing the actor definitions When multiple instances are running, CRUD operations be reflected from one to the others

type Edge

type Edge struct {
	From NodeOutputRef
	To   NodeInputRef
}

Edge is a Graph oriented edge

func EdgeFromString

func EdgeFromString(str string) (Edge, error)

EdgeFromString unserialize a Edge

func (Edge) MarshalYAML

func (e Edge) MarshalYAML() (interface{}, error)

MarshalYAML marshals Edge to string

func (Edge) String

func (e Edge) String() string

String() serialize a Edge

func (*Edge) UnmarshalYAML

func (e *Edge) UnmarshalYAML(unmarshal func(interface{}) error) error

UnmarshalYAML unmarshals Edge from string

type Emission

type Emission struct {
	ProcessID      api.UUID
	EnvelopeID     api.UUID
	TargetActor    api.UUID
	TargetInputRef NodeInputRef
}

Emission identifies the emission of an envelope to a target

type EnvelopeFilter

type EnvelopeFilter func([]api.UUID) []api.UUID

EnvelopeFilter filters a batch of envelope ids at once

type EnvelopeReadPos

type EnvelopeReadPos struct {
	Events   map[api.UUID]EventReadPos
	Start    bool
	Complete bool
}

EnvelopeReadPos contains the current reading position of each event of an envelope It can be used to retrieve the next part of an envelope later

func DecodePosition

func DecodePosition(apiPos api.EnvelopePosition) EnvelopeReadPos

DecodePosition translate a api.EnvelopePosition into a storage.EnvelopeReadPos

func EnvelopeStartPos

func EnvelopeStartPos() EnvelopeReadPos

EnvelopeStartPos returns a new EnvelopeReadPos pointing the beginning of the envelope

func (EnvelopeReadPos) Clone

func (p EnvelopeReadPos) Clone() EnvelopeReadPos

Clone duplicate the position

func (EnvelopeReadPos) Equals

func (p EnvelopeReadPos) Equals(other EnvelopeReadPos) bool

Equals returns true if postions are equals

type EnvelopeState

type EnvelopeState struct {
	ID         api.UUID
	TypesKnown bool // true if the type of all the events is known
	Status     EnvelopeStorageReceptionStatus
	Err        error

	Events []EventState
}

EnvelopeState contains the detailled status of an envelope, allowing the broker to make decisions

func (EnvelopeState) Error

func (s EnvelopeState) Error() string

Error implements the error interface in case State is "error"

func (EnvelopeState) EventTypes

func (s EnvelopeState) EventTypes() []string

EventTypes returns the event types or nil if not fully known yet

type EnvelopeStorage

type EnvelopeStorage interface {
	// Set the event handler. Any current handler gets overridden.
	// nil can be passed to remove the current handler.
	SetEventHandler(EnvelopeStorageEventHandler)

	// Store part or all fo an envelope
	// The returned EnvelopeState reflect the final state of the complete
	// envelope, once the given part is merge to it.
	//
	// Concurrency & ordering:
	// The envelope parts can be passed in any order, from different
	// goroutines, and to different instance of the Storage in different
	// processes all pointed to the same backend.
	//
	// Error handling:
	// It any error occur, including validity errors (which are reported
	// within the EnvelopeState), the envelope should be marked as 'error'
	// in the storage, and no other part of the envelope should be accepted.
	StoreEnvelope(api.Envelope) (EnvelopeState, error)

	// CheckStalled mark the given envelope as stalled if no fragment was
	// received in the given time _and_ the envelope is in state "Receiving"
	// Returns true if the envelope is indeed stalled
	// If the envelope is already marked stalled, the given time is ignored.
	CheckStalled(api.UUID, time.Duration) bool

	// GetEventTypes returns the eventtypes of an envelope, or nil if unknown
	// exists is false if the envelope is unknown, true if exists.
	// should panic on any internal error.
	GetEventTypes(id api.UUID) (types []string, exists bool)

	ReadEnvelope(id api.UUID, position EnvelopeReadPos, maxsize int) (api.Envelope, EnvelopeReadPos, error)

	Purge(EnvelopeFilter)
}

EnvelopeStorage can store partial envelopes, track the completions of the events inside it, serve back the data in chunks

type EnvelopeStorageEventHandler

type EnvelopeStorageEventHandler func(
	envelopeID api.UUID, status EnvelopeStorageReceptionStatus, newData bool)

EnvelopeStorageEventHandler is a handler for EnvelopeStorage events. newData is true is more data is now available on the envelope

type EnvelopeStorageReceptionStatus

type EnvelopeStorageReceptionStatus int

EnvelopeStorageReceptionStatus is the reception status of an envelope or an event

const (
	// EnvelopeStorageReceptionUnknown is when the reception status is unknwown
	EnvelopeStorageReceptionUnknown EnvelopeStorageReceptionStatus = iota

	// EnvelopeStorageReceptionReceiving is when the reception is not finished and
	// a piece of data was received not long ago
	EnvelopeStorageReceptionReceiving

	// EnvelopeStorageReceptionComplete is when the envelope or event is fully arrived
	EnvelopeStorageReceptionComplete

	// EnvelopeStorageReceptionError is when an error occured
	EnvelopeStorageReceptionError

	// EnvelopeStorageReceptionStalled is when an envelope misses some fragment
	// and nothing comes in for a given time and some storage client is waiting
	EnvelopeStorageReceptionStalled
)

func (EnvelopeStorageReceptionStatus) MarshalText

func (i EnvelopeStorageReceptionStatus) MarshalText() ([]byte, error)

func (EnvelopeStorageReceptionStatus) String

func (*EnvelopeStorageReceptionStatus) UnmarshalText

func (i *EnvelopeStorageReceptionStatus) UnmarshalText(text []byte) error

type EventReadPos

type EventReadPos struct {
	Index     uint64
	ItemCount uint64
}

EventReadPos containts the current reading position of an event

type EventState

type EventState struct {
	ID             api.UUID
	Type           string
	AvailableItems uint64 // number of items that can be read in a sequence
	TotalItems     uint64 // total number of items or 0 if unknown
	Status         EnvelopeStorageReceptionStatus
	Err            error
}

EventState contains the detailled reception state of an event

func (EventState) Error

func (s EventState) Error() string

Error implements the error interface in case Status is "error"

type Graph

type Graph struct {
	Nodes []Node
	Edges []Edge
}

Graph stores the graph definition of a pipeline

type Job

type Job struct {
	JobID
	JobState
}

Job is a director job definition and state

func AllJobs

func AllJobs(list JobList) ([]Job, error)

AllJobs returns a complete list and close the given JobList

type JobID

type JobID struct {
	ProcessID api.UUID
	Node      ProcessNode
}

JobID A job unique id.

type JobList

type JobList interface {
	Next() bool
	Get() (Job, error)
	Close()
}

JobList is for returning long job lists in a stream

type JobState

type JobState struct {
	Detached    bool
	Status      api.ActorProcessingState_Status
	EnvelopeIDs []api.UUID
}

JobState contains all the job state

type JobStorage

type JobStorage interface {
	NewJob(JobID, JobState)
	SetState(JobID, JobState)
	GetState(JobID) JobState

	ListPendingJobByActor(actor api.UUID, after *JobID, limit int) []Job
	ListJobByProcess(api.UUID) []Job
	ListRunningJobByEnvelopeID(api.UUID) []Job
	ListDetachedJobByActor(actorID api.UUID) (JobList, error)

	Purge([]api.UUID) error
}

JobStorage persists the jobs

type LogStorage

type LogStorage interface {
	Log(api.LogEntry)
	GetProcessMessages(processID api.UUID, level api.LogLevel) []api.LogEntry
	PurgeBefore(time.Time) int64
}

A LogStorage stores log entries and can return them given some filters

type Node

type Node struct {
	ID            string
	Type          NodeType
	ActorIDs      []api.UUID // Explicit actors. Deprecated (will be removed in a later version)
	Actors        []string   // Explicit actors, by name or UUID
	Roles         []string   // Actor roles
	RoleBroadcast bool       // If true, all actors matching each role will receive the message, instead of only one actor per role

	SourceMatch *SourceMatch // define only if Type==EmitterNode

	Inputs  []string
	Outputs []string
}

Node is a graph vertex

type NodeInputRef

type NodeInputRef struct {
	NodeID string
	Input  string
}

NodeInputRef is a reference to a node input

func NodeInputRefFromString

func NodeInputRefFromString(str string) (ref NodeInputRef, err error)

NodeInputRefFromString loads a NodeInputRef

func (NodeInputRef) MarshalYAML

func (r NodeInputRef) MarshalYAML() (interface{}, error)

MarshalYAML marshals NodeInputRef to string

func (NodeInputRef) String

func (r NodeInputRef) String() string

String serialize a NodeInputRef

func (*NodeInputRef) UnmarshalYAML

func (r *NodeInputRef) UnmarshalYAML(unmarshal func(interface{}) error) error

UnmarshalYAML unmarshals NodeInputRef from string

type NodeOutputRef

type NodeOutputRef struct {
	NodeID string
	Output string
}

NodeOutputRef is a reference to a node input

func NodeOutputRefFromString

func NodeOutputRefFromString(str string) (ref NodeOutputRef, err error)

NodeOutputRefFromString loads a NodeInputRef

func (NodeOutputRef) MarshalYAML

func (r NodeOutputRef) MarshalYAML() (interface{}, error)

MarshalYAML marshals NodeOutputRef to string

func (NodeOutputRef) String

func (r NodeOutputRef) String() string

String serialize a NodeOutputRef

func (*NodeOutputRef) UnmarshalYAML

func (r *NodeOutputRef) UnmarshalYAML(unmarshal func(interface{}) error) error

UnmarshalYAML unmarshals NodeOutputRef from string

type NodeType

type NodeType int

NodeType is the type of Node

const (
	// EmitterNode is the type of emitter nodes
	EmitterNode NodeType = iota
	// ConsumerNode is the type of consumer nodes
	ConsumerNode
	// WorkerNode is the type of worker nodes
	WorkerNode
	// RequestorNode is an emitter that waits for a reply
	RequestorNode
)

func (NodeType) MarshalText

func (i NodeType) MarshalText() ([]byte, error)

func (NodeType) String

func (i NodeType) String() string

func (*NodeType) UnmarshalText

func (i *NodeType) UnmarshalText(text []byte) error

type PasswordStorage

type PasswordStorage interface {
	Set(id api.UUID, password string)
	Reset(id api.UUID)
	Verify(id api.UUID, password string) bool
}

PasswordStorage stores passwords in a safe manner

type Pipeline

type Pipeline struct {
	api.PipelineInfo
	Graph Graph
}

Pipeline defines how an envelope should be processed

type PipelineStorage

type PipelineStorage interface {
	Load(api.UUID) (Pipeline, bool)
	Save(Pipeline) (api.UUID, error)
	SetStatus(api.UUID, api.PipelineInfo_Status) error
	LoadActivePipeline() []Pipeline
	Query(name, version string, activeOnly bool) []Pipeline
}

PipelineStorage handle Pipeline persistence

type PostMortemState

type PostMortemState struct {
	Level   api.LogLevel
	Status  api.PMProcess_Status
	Comment string
	Changed time.Time
}

PostMortemState describes the post-mortem state of a Process

type Process

type Process struct {
	ID                api.UUID
	TriggerEmitterID  api.UUID
	TriggerEnvelopeID api.UUID
	CreatedAt         time.Time
	PipelineID        api.UUID
	SourceOutput      NodeOutputRef
}

Process contains the data necessary for process persistence

type ProcessEventStatusHandler

type ProcessEventStatusHandler func(p Process, old api.Process_Status, new api.Process_Status)

ProcessEventStatusHandler is the type of function that handle status changes

type ProcessLogEntry

type ProcessLogEntry struct {
	ID               string
	ProcessID        api.UUID
	Timestamp        time.Time
	Emission         *Emission        `json:",omitempty"`
	EmissionPosition *EnvelopeReadPos `json:",omitempty"`
	Reception        *Reception       `json:",omitempty"`
	ReceptionStatus  ReceptionStatus  `json:",omitempty"`
	ProcessNodeEnd   *ProcessNodeEnd  `json:",omitempty"`
	ResultAck        bool             `json:",omitempty"`
}

ProcessLogEntry is the unique source of information for process tracking.

type ProcessLogEntryHandler

type ProcessLogEntryHandler func(entries []ProcessLogEntry)

ProcessLogEntryHandler is the type of functions that handle process log entries

type ProcessNode

type ProcessNode struct {
	ActorID api.UUID
	NodeID  string
}

A ProcessNode is a live pipeline graph node. One graph node can correspond to several ProcessNode (especially consumer nodes)

func ProcessNodeFromString

func ProcessNodeFromString(s string) (ProcessNode, error)

ProcessNodeFromString deserialize a ProcessNode

func (ProcessNode) MarshalText

func (n ProcessNode) MarshalText() ([]byte, error)

MarshalText implements encoding.TextMarshaler for ProcessNode

func (ProcessNode) String

func (n ProcessNode) String() string

String serialize a ProcessNode

func (*ProcessNode) UnmarshalText

func (n *ProcessNode) UnmarshalText(text []byte) (err error)

UnmarshalText implements encoding.TextUnmarshaler for ProcessNode

type ProcessNodeEnd

type ProcessNodeEnd struct {
	Node   ProcessNode
	Status api.ActorProcessingState_Status
}

ProcessNodeEnd signals the end of processing by a process node

type ProcessNodeIOStatus

type ProcessNodeIOStatus int

ProcessNodeIOStatus reflects the status of the inputs or outputs of a node

const (
	// ProcessNodeIOPending is when no io started yet
	ProcessNodeIOPending ProcessNodeIOStatus = iota
	// ProcessNodeIOStarted is when one or more io started emitting
	ProcessNodeIOStarted
	// ProcessNodeIOAllStarted is when all io started emitting
	ProcessNodeIOAllStarted
	// ProcessNodeIOClosed is when all the io is closed without any transmission
	ProcessNodeIOClosed
	// ProcessNodeIODone is when all io finished emitting
	ProcessNodeIODone
	// ProcessNodeIOError is when an error occured on one of the io
	ProcessNodeIOError
)

func (ProcessNodeIOStatus) MarshalText

func (i ProcessNodeIOStatus) MarshalText() ([]byte, error)

func (ProcessNodeIOStatus) String

func (i ProcessNodeIOStatus) String() string

func (*ProcessNodeIOStatus) UnmarshalText

func (i *ProcessNodeIOStatus) UnmarshalText(text []byte) error

type ProcessNodeIOStatusMap

type ProcessNodeIOStatusMap map[string]ProcessNodeIOStatus

ProcessNodeIOStatusMap is a map of emission or reception statuses

type ProcessNodeInput

type ProcessNodeInput struct {
	Name       string
	EnvelopeID api.UUID
}

ProcessNodeInput describes which envelope does on a given input

type ProcessNodeState

type ProcessNodeState struct {
	Status         ProcessNodeStatus
	Error          string
	Input          ProcessNodeIOStatus
	InputStatuses  ProcessNodeIOStatusMap
	Output         ProcessNodeIOStatus
	OutputStatuses ProcessNodeIOStatusMap
}

ProcessNodeState reflect the current state of a process node.

type ProcessNodeStateMap

type ProcessNodeStateMap map[ProcessNode]ProcessNodeState

ProcessNodeStateMap contains states of process nodes

func (ProcessNodeStateMap) MarshalYAML

func (m ProcessNodeStateMap) MarshalYAML() (interface{}, error)

MarshalYAML as an ordered dict

type ProcessNodeStatus

type ProcessNodeStatus int

ProcessNodeStatus is the process node status

const (
	// ProcessNodeStatusPending is before node activation
	ProcessNodeStatusPending ProcessNodeStatus = iota
	// ProcessNodeActive is when any input or output is active
	ProcessNodeActive
	// ProcessNodeDone is when all input/output are done transmitting
	ProcessNodeDone
	// ProcessNodeUnreachable is when every path leading to the node
	// are closed and the node will never be active
	ProcessNodeUnreachable
	// ProcessNodeError is when an error occured on the node
	ProcessNodeError
)

func (ProcessNodeStatus) MarshalText

func (i ProcessNodeStatus) MarshalText() ([]byte, error)

func (ProcessNodeStatus) String

func (i ProcessNodeStatus) String() string

func (*ProcessNodeStatus) UnmarshalText

func (i *ProcessNodeStatus) UnmarshalText(text []byte) error

type ProcessState

type ProcessState struct {
	Status            api.Process_Status
	StatusChanged     time.Time
	StatusReason      string
	NodeStates        ProcessNodeStateMap
	NodeStatesChanged time.Time

	ResponseEnvelopeID api.UUID
	ResultAcked        bool
}

ProcessState reflect the current state of a process execution. It is completely updated by applying incoming logentries

func (ProcessState) Fmt

func (s ProcessState) Fmt() string

Fmt returns a formatted string of the state

type ProcessStorage

type ProcessStorage interface {
	GetProcess(api.UUID) (Process, bool)
	GetProcessByTrigger(emitter api.Actor, envelopeID api.UUID) (Process, bool)

	Query(filter api.ProcessFilter) []Process

	FindProcessForEnvelope(envelopeIDs []api.UUID) map[api.UUID]api.UUID

	CreateProcess(Process) (api.UUID, error)
	SetPipeline(processID api.UUID, pipelineID api.UUID, sourceOutput NodeOutputRef)

	GetTargets(
		processID api.UUID,
		source NodeOutputRef,
		envelopeID api.UUID,
		noRouteTableUpdate bool,
		calcTargets func() ([]api.EnvelopeTarget, bool, error),
	) ([]api.EnvelopeTarget, error)

	GetInputs(
		processID api.UUID,
		targetNode ProcessNode,
	) []ProcessNodeInput

	UpdateReceiveStatus(reception Reception, status ReceptionStatus) error

	ProcessNodeEnd(
		process api.UUID, node ProcessNode,
		status api.ActorProcessingState_Status) error

	GetState(api.UUID) ProcessState
	GetStateUpdater(api.UUID) StateUpdater
	SetStatus(api.UUID, api.Process_Status, string) error

	// Generate a AckResult process log entry
	AckResult(api.UUID)

	SetEventHandlers(onStatusChanged ProcessEventStatusHandler)
	SetProcessLogEntryHandler(ProcessLogEntryHandler)

	GetUnprocessedProcessLogEntries() []ProcessLogEntry
	GetProcessLogEntries(api.UUID) []ProcessLogEntry

	QueryPostMortem(level api.LogLevel, status ...api.PMProcess_Status) []Process
	GetPostMortemState(api.UUID) PostMortemState
	SetPostMortemState(api.UUID, PostMortemState)

	// PurgeProcess removes all data of some processes.
	// The process must be terminated
	// Returns the list of process purged. If some ids are missing, the process
	// were non-existant or they had a non terminal status
	PurgeProcess([]api.UUID) ([]api.UUID, error)
}

ProcessStorage persists process data

type Reception

type Reception struct {
	ProcessID       api.UUID
	EnvelopeID      api.UUID
	SourceActor     api.UUID
	SourceOutputRef NodeOutputRef
}

Reception identifies a reception of an envelop from an actor

type ReceptionStatus

type ReceptionStatus int

ReceptionStatus describes the status of an envelope reception in the context of a process

const (
	// ReceptionPending is when envelop reception is not yet started
	ReceptionPending ReceptionStatus = iota
	// ReceptionOngoing is when part(s) of the envelope were received, but not all
	ReceptionOngoing
	// ReceptionDone is when all the complete envelop has been receiced
	ReceptionDone
	// ReceptionError is when a reception error occured
	ReceptionError
	// ReceptionClosed is when no envelope is to be expected
	ReceptionClosed
)

func (ReceptionStatus) MarshalText

func (s ReceptionStatus) MarshalText() ([]byte, error)

MarshalText serialize a ReceptionStatus to text

func (ReceptionStatus) String

func (s ReceptionStatus) String() string

func (*ReceptionStatus) UnmarshalText

func (s *ReceptionStatus) UnmarshalText(data []byte) error

UnmarshalText parses a ReceptionStatus

type Session

type Session struct {
	api.SessionToken
	AccountID api.UUID
}

Session links a SessionToken and a Account

type SessionStorage

type SessionStorage interface {
	List() ([]Session, error)
	Get(token string) (*Session, error)
	Set(Session) error
	Delete(token string) error
	DeleteOlderThan(time time.Time) error
}

SessionStorage stores Sessions

type SourceMatch

type SourceMatch struct {
	EventTypes []string // All event types envelope must contain exactly
}

SourceMatch are rules to match incoming envelope with

type StateUpdater

type StateUpdater interface {
	Get() (ProcessState, error)
	Set(ProcessState) error
	MarkLogEntry(id string, asError bool) error
	Unlock() error
}

StateUpdater provides exclusive access to a process state

Directories

Path Synopsis
sql

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL