Documentation
¶
Overview ¶
Package pipeline implements declarative data processing pipelines that transform Kubernetes resources using DBSP-based incremental computation.
Pipelines define how source Kubernetes resources are joined, filtered, and aggregated to produce target view objects. They support complex relational operations while maintaining incremental update semantics for efficiency.
Pipeline operations:
- @join: Combine multiple resource types with boolean conditions.
- @select: Filter objects based on boolean expressions.
- @project: Transform object structure and extract fields.
- @unwind: Expand array fields into multiple objects.
- @gather: Collect multiple objects into aggregated results.
Example usage:
pipeline, _ := pipeline.NewPipeline("my-op", "TargetView", sources,
opv1a1.Pipeline{
Join: &opv1a1.JoinExpression{...},
Aggregation: &opv1a1.AggregationExpression{...},
...
}, logger)
Index ¶
- Variables
- type ErrAggregation
- type ErrInvalidObject
- type ErrJoin
- type ErrPipeline
- type Evaluator
- type GatherOp
- type JoinOp
- type Pipeline
- func (p *Pipeline) ConvertDeltaToZSet(delta object.Delta) (*dbsp.DocumentZSet, error)
- func (p *Pipeline) ConvertZSetToDelta(zset *dbsp.DocumentZSet, view string) ([]object.Delta, error)
- func (p *Pipeline) Evaluate(delta object.Delta) ([]object.Delta, error)
- func (p *Pipeline) NewGatherOp(e *expression.Expression) (dbsp.Operator, error)
- func (p *Pipeline) NewJoinOp(e *expression.Expression, sources []schema.GroupVersionKind) dbsp.Operator
- func (p *Pipeline) NewProjectionOp(e *expression.Expression) dbsp.Operator
- func (p *Pipeline) NewSelectionOp(e *expression.Expression) dbsp.Operator
- func (p *Pipeline) NewUnwindOp(e *expression.Expression) (dbsp.Operator, error)
- func (p *Pipeline) Reconcile(ds []object.Delta) ([]object.Delta, error)
- func (p *Pipeline) String() string
- type ProjectionOp
- type SelectionOp
- type UnwindOp
Constants ¶
This section is empty.
Variables ¶
var ObjectKey = toolscache.MetaObjectToName
Functions ¶
This section is empty.
Types ¶
type ErrAggregation ¶
type ErrAggregation = error
func NewAggregationError ¶
func NewAggregationError(err error) ErrAggregation
type ErrInvalidObject ¶
type ErrInvalidObject = error
func NewInvalidObjectError ¶
func NewInvalidObjectError(message string) ErrInvalidObject
type ErrPipeline ¶
type ErrPipeline = error
func NewPipelineError ¶
func NewPipelineError(err error) ErrPipeline
type Evaluator ¶
Evaluator is a query that knows how to evaluate itself on a given delta and how to print itself.
type Pipeline ¶
type Pipeline struct {
// contains filtered or unexported fields
}
Pipeline is query that knows how to evaluate itself.
func (*Pipeline) ConvertDeltaToZSet ¶
ConvertDeltaToZSet converts a delta into a ZSet that can be passed to DBSP.
func (*Pipeline) ConvertZSetToDelta ¶
ConvertZSetToDelta converts a ZSet as returned by DBSP to a delta.
func (*Pipeline) NewGatherOp ¶
func (p *Pipeline) NewGatherOp(e *expression.Expression) (dbsp.Operator, error)
func (*Pipeline) NewJoinOp ¶
func (p *Pipeline) NewJoinOp(e *expression.Expression, sources []schema.GroupVersionKind) dbsp.Operator
func (*Pipeline) NewProjectionOp ¶
func (p *Pipeline) NewProjectionOp(e *expression.Expression) dbsp.Operator
func (*Pipeline) NewSelectionOp ¶
func (p *Pipeline) NewSelectionOp(e *expression.Expression) dbsp.Operator
func (*Pipeline) NewUnwindOp ¶
func (p *Pipeline) NewUnwindOp(e *expression.Expression) (dbsp.Operator, error)
func (*Pipeline) Reconcile ¶
Reconcile processes a delta set containing only unrdered(!) add/delete ops into a proper ordered(!) delete/upsert delta list.
DBSP outputs onordered zsets so there is no way to know for documents that map to the same primary key whether an add or a delete comes first, and the two orders yield different results. To remove this ambiguity, we maintain a target cache that contains the latest known state of the target view and we take the (doc->+/-1) pairs in any order from the zset result set. The rules are as follows:
- additions (doc->+1): we extract the primary key from doc and immediately upsert the doc into the cache with that key and add the upsert delta to our result set, possibly overwriting any previous delta for the same key.
- deletions (doc->-1): we again extract the primary key from doc and first we fetch the current entry from the cache and check if doc==doc. If there is no entry in the cache for the key or the latest state equals the doc to be deleted, we add the delete to the cache and the result delta, otherwise we drop the delete event and move on.
type ProjectionOp ¶
type ProjectionOp struct {
// contains filtered or unexported fields
}
Projection operator.
func (*ProjectionOp) String ¶
func (eval *ProjectionOp) String() string
type SelectionOp ¶
type SelectionOp struct {
// contains filtered or unexported fields
}
Selection operator.
func (*SelectionOp) String ¶
func (eval *SelectionOp) String() string