pipeline

package
v0.0.0-...-bfef3a7 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: May 9, 2025 License: Apache-2.0 Imports: 14 Imported by: 0

Documentation

Index

Constants

This section is empty.

Variables

Functions

func Normalize

func Normalize(eng Engine, delta cache.Delta) (cache.Delta, error)

Types

type Aggregation

type Aggregation struct {
	*opv1a1.Aggregation
	Stages []*Stage
	// contains filtered or unexported fields
}

Aggregation is an operation that can be used to process, objects, or alter the shape of a list of objects in a view.

func NewAggregation

func NewAggregation(engine Engine, config *opv1a1.Aggregation) *Aggregation

NewAggregation creates a new aggregation from a seralized representation.

func (*Aggregation) Evaluate

func (a *Aggregation) Evaluate(delta cache.Delta) ([]cache.Delta, error)

Evaluate processes an aggregation expression on the given delta.

func (*Aggregation) String

func (a *Aggregation) String() string

type Engine

type Engine interface {
	// EvaluateJoin evaluates a join expression.
	EvaluateJoin(j *Join, delta cache.Delta) ([]cache.Delta, error)
	// EvaluateAggregation evaluates an aggregation pipeline.
	EvaluateAggregation(a *Aggregation, delta cache.Delta) ([]cache.Delta, error)
	// EvaluateStage evaluates a single aggregation stage.
	EvaluateStage(s *Stage, delta cache.Delta) ([]cache.Delta, error)
	// IsValidEvent returns false for some invalid events, like null-events or duplicate
	// events.
	IsValidEvent(cache.Delta) bool
	// View returns the target view of the engine.
	View() string
	// WithObjects sets some base objects in the cache for testing.
	WithObjects(objects ...object.Object)
	// Log returns a logger.
	Log() logr.Logger
}

func NewDefaultEngine

func NewDefaultEngine(targetView string, baseviews []gvk, log logr.Logger) Engine

type ErrAggregation

type ErrAggregation = error

func NewAggregationError

func NewAggregationError(err error) ErrAggregation

type ErrInvalidObject

type ErrInvalidObject = error

func NewInvalidObjectError

func NewInvalidObjectError(message string) ErrInvalidObject

type ErrJoin

type ErrJoin = error

func NewJoinError

func NewJoinError(err error) ErrJoin

type Evaluator

type Evaluator interface {
	Evaluate(cache.Delta) ([]cache.Delta, error)
	fmt.Stringer
}

Evaluator is a query that knows how to evaluate itself on a given delta and how to print itself.

func NewPipeline

func NewPipeline(target string, sources []gvk, config opv1a1.Pipeline, log logr.Logger) (Evaluator, error)

NewPipeline creates a new pipeline from the set of base objects and a seralized pipeline that writes into a given target.

type Join

type Join struct {
	*opv1a1.Join
	// contains filtered or unexported fields
}

Join is an operation that can be used to perform an inner Join on a list of views.

func NewJoin

func NewJoin(engine Engine, config *opv1a1.Join) *Join

NewJoin creates a new join from a seralized representation.

func (*Join) Evaluate

func (j *Join) Evaluate(delta cache.Delta) ([]cache.Delta, error)

Evaluate processes a join expression on the given deltas. Returns the new deltas if there were no errors and an error otherwise.

func (*Join) String

func (j *Join) String() string

type Pipeline

type Pipeline struct {
	*Join
	*Aggregation
	// contains filtered or unexported fields
}

Pipeline is query that knows how to evaluate itself.

func (*Pipeline) Evaluate

func (p *Pipeline) Evaluate(delta cache.Delta) ([]cache.Delta, error)

Evaluate processes an pipeline expression on the given delta.

func (*Pipeline) String

func (p *Pipeline) String() string

type Stage

type Stage struct {
	*expression.Expression
	// contains filtered or unexported fields
}

Stage is a single operation in an aggregation.

func NewStage

func NewStage(engine Engine, e *expression.Expression) *Stage

NewStage creates a new stage from a single aggregation stage.

func (*Stage) Evaluate

func (s *Stage) Evaluate(delta cache.Delta) ([]cache.Delta, error)

Evaluate processes an aggregation stage on the given delta.

func (*Stage) String

func (s *Stage) String() string

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL