pipeline

package
v0.1.1 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Sep 20, 2025 License: Apache-2.0 Imports: 13 Imported by: 0

Documentation

Overview

Package pipeline implements declarative data processing pipelines that transform Kubernetes resources using DBSP-based incremental computation.

Pipelines define how source Kubernetes resources are joined, filtered, and aggregated to produce target view objects. They support complex relational operations while maintaining incremental update semantics for efficiency.

Pipeline operations:

  • @join: Combine multiple resource types with boolean conditions.
  • @select: Filter objects based on boolean expressions.
  • @project: Transform object structure and extract fields.
  • @unwind: Expand array fields into multiple objects.
  • @gather: Collect multiple objects into aggregated results.

Example usage:

pipeline, _ := pipeline.NewPipeline("my-op", "TargetView", sources,
    opv1a1.Pipeline{
        Join: &opv1a1.JoinExpression{...},
        Aggregation: &opv1a1.AggregationExpression{...},
     ...
    }, logger)

Index

Constants

This section is empty.

Variables

Functions

This section is empty.

Types

type ErrAggregation

type ErrAggregation = error

func NewAggregationError

func NewAggregationError(err error) ErrAggregation

type ErrInvalidObject

type ErrInvalidObject = error

func NewInvalidObjectError

func NewInvalidObjectError(message string) ErrInvalidObject

type ErrJoin

type ErrJoin = error

func NewJoinError

func NewJoinError(err error) ErrJoin

type ErrPipeline

type ErrPipeline = error

func NewPipelineError

func NewPipelineError(err error) ErrPipeline

type Evaluator

type Evaluator interface {
	Evaluate(object.Delta) ([]object.Delta, error)
	fmt.Stringer
}

Evaluator is a query that knows how to evaluate itself on a given delta and how to print itself.

func New

func New(operator string, target string, sources []schema.GroupVersionKind, config opv1a1.Pipeline, log logr.Logger) (Evaluator, error)

New creates a new pipeline from the set of base objects and a seralized pipeline that writes into a given target.

type GatherOp

type GatherOp struct {
	// contains filtered or unexported fields
}

Gather operator.

func (*GatherOp) String

func (eval *GatherOp) String() string

func (*GatherOp) Transform

func (eval *GatherOp) Transform(doc dbsp.Document, v any) (dbsp.Document, error)

type JoinOp

type JoinOp struct {
	// contains filtered or unexported fields
}

Join operator.

func (*JoinOp) Evaluate

func (eval *JoinOp) Evaluate(doc dbsp.Document) ([]dbsp.Document, error)

func (*JoinOp) String

func (eval *JoinOp) String() string

type Pipeline

type Pipeline struct {
	// contains filtered or unexported fields
}

Pipeline is query that knows how to evaluate itself.

func (*Pipeline) ConvertDeltaToZSet

func (p *Pipeline) ConvertDeltaToZSet(delta object.Delta) (*dbsp.DocumentZSet, error)

ConvertDeltaToZSet converts a delta into a ZSet that can be passed to DBSP.

func (*Pipeline) ConvertZSetToDelta

func (p *Pipeline) ConvertZSetToDelta(zset *dbsp.DocumentZSet, view string) ([]object.Delta, error)

ConvertZSetToDelta converts a ZSet as returned by DBSP to a delta.

func (*Pipeline) Evaluate

func (p *Pipeline) Evaluate(delta object.Delta) ([]object.Delta, error)

Evaluate processes an pipeline on the given delta.

func (*Pipeline) NewGatherOp

func (p *Pipeline) NewGatherOp(e *expression.Expression) (dbsp.Operator, error)

func (*Pipeline) NewJoinOp

func (p *Pipeline) NewJoinOp(e *expression.Expression, sources []schema.GroupVersionKind) dbsp.Operator

func (*Pipeline) NewProjectionOp

func (p *Pipeline) NewProjectionOp(e *expression.Expression) dbsp.Operator

func (*Pipeline) NewSelectionOp

func (p *Pipeline) NewSelectionOp(e *expression.Expression) dbsp.Operator

func (*Pipeline) NewUnwindOp

func (p *Pipeline) NewUnwindOp(e *expression.Expression) (dbsp.Operator, error)

func (*Pipeline) Reconcile

func (p *Pipeline) Reconcile(ds []object.Delta) ([]object.Delta, error)

Reconcile processes a delta set containing only unrdered(!) add/delete ops into a proper ordered(!) delete/upsert delta list.

DBSP outputs onordered zsets so there is no way to know for documents that map to the same primary key whether an add or a delete comes first, and the two orders yield different results. To remove this ambiguity, we maintain a target cache that contains the latest known state of the target view and we take the (doc->+/-1) pairs in any order from the zset result set. The rules are as follows:

  • additions (doc->+1): we extract the primary key from doc and immediately upsert the doc into the cache with that key and add the upsert delta to our result set, possibly overwriting any previous delta for the same key.
  • deletions (doc->-1): we again extract the primary key from doc and first we fetch the current entry from the cache and check if doc==doc. If there is no entry in the cache for the key or the latest state equals the doc to be deleted, we add the delete to the cache and the result delta, otherwise we drop the delete event and move on.

func (*Pipeline) String

func (p *Pipeline) String() string

String stringifies a pipeline.

type ProjectionOp

type ProjectionOp struct {
	// contains filtered or unexported fields
}

Projection operator.

func (*ProjectionOp) Evaluate

func (eval *ProjectionOp) Evaluate(doc dbsp.Document) ([]dbsp.Document, error)

func (*ProjectionOp) String

func (eval *ProjectionOp) String() string

type SelectionOp

type SelectionOp struct {
	// contains filtered or unexported fields
}

Selection operator.

func (*SelectionOp) Evaluate

func (eval *SelectionOp) Evaluate(doc dbsp.Document) ([]dbsp.Document, error)

func (*SelectionOp) String

func (eval *SelectionOp) String() string

type UnwindOp

type UnwindOp struct {
	// contains filtered or unexported fields
}

Unwind operator.

func (*UnwindOp) Extract

func (eval *UnwindOp) Extract(doc dbsp.Document) (any, error)

func (*UnwindOp) String

func (eval *UnwindOp) String() string

func (*UnwindOp) Transform

func (eval *UnwindOp) Transform(doc dbsp.Document, v any) (dbsp.Document, error)

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL