Documentation
¶
Index ¶
- func DeepCopyAny(val any) any
- func DeepEqual(a, b Document) (bool, error)
- type AddOp
- type AggregateInput
- type BaseOp
- type BinaryJoinOp
- type ChainGraph
- func (g *ChainGraph) AddInput(op *InputOp) string
- func (g *ChainGraph) AddToChain(op Operator) string
- func (g *ChainGraph) Arity() int
- func (g *ChainGraph) GetChain() []Operator
- func (g *ChainGraph) GetInput(name string) *InputOp
- func (g *ChainGraph) GetStartNode() string
- func (g *ChainGraph) SetJoin(op Operator) string
- func (g *ChainGraph) String() string
- func (g *ChainGraph) Validate() error
- type ConstantOp
- type DelayOp
- type DeltaZSet
- type DifferentiatorOp
- type DistinctOp
- type DistinctOptimizationRule
- type Document
- type DocumentEntry
- type DocumentZSet
- func (dz *DocumentZSet) Add(other *DocumentZSet) (*DocumentZSet, error)
- func (dz *DocumentZSet) AddDocument(doc Document, count int) (*DocumentZSet, error)
- func (dz *DocumentZSet) AddDocumentMutate(doc Document, count int) error
- func (dz *DocumentZSet) Contains(doc Document) (bool, error)
- func (dz *DocumentZSet) DeepCopy() *DocumentZSet
- func (dz *DocumentZSet) Distinct() (*DocumentZSet, error)
- func (dz *DocumentZSet) GetDocuments() ([]Document, error)
- func (dz *DocumentZSet) GetMultiplicity(doc Document) (int, error)
- func (dz *DocumentZSet) GetUniqueDocuments() ([]Document, error)
- func (dz *DocumentZSet) IsZero() bool
- func (dz *DocumentZSet) List() ([]DocumentEntry, error)
- func (dz *DocumentZSet) ShallowCopy() *DocumentZSet
- func (dz *DocumentZSet) Size() int
- func (dz *DocumentZSet) String() string
- func (dz *DocumentZSet) Subtract(other *DocumentZSet) (*DocumentZSet, error)
- func (dz *DocumentZSet) TotalSize() int
- func (dz *DocumentZSet) Unique() (*DocumentZSet, error)
- func (dz *DocumentZSet) UniqueCount() int
- type Evaluator
- type Executor
- type Extractor
- type FusedOp
- type GatherOp
- type GraphNode
- type GroupData
- type IncrementalBinaryJoinOp
- func (op *IncrementalBinaryJoinOp) HasZeroPreservationProperty() bool
- func (op *IncrementalBinaryJoinOp) IsTimeInvariant() bool
- func (op *IncrementalBinaryJoinOp) OpType() OperatorType
- func (op *IncrementalBinaryJoinOp) Process(inputs ...*DocumentZSet) (*DocumentZSet, error)
- func (op *IncrementalBinaryJoinOp) Reset()
- type IncrementalExecutionContext
- type IncrementalGatherOp
- type IncrementalJoinOp
- type InputOp
- type IntegrationDifferentiationEliminationRule
- type IntegratorOp
- type JoinIncrementalizationRule
- type JoinOp
- type LinearChainIncrementalizationRule
- type LinearChainRewriteEngine
- type LinearChainRule
- type LinearOperatorFusionRule
- type NegateOp
- type Operator
- type OperatorType
- type ProjectThenSelectOp
- type ProjectionOp
- type SelectThenProjectionsOp
- type SelectionOp
- type SubtractOp
- type Transformer
- type UnwindOp
- type ZSetError
Constants ¶
This section is empty.
Variables ¶
This section is empty.
Functions ¶
func DeepCopyAny ¶
DeepCopyAny creates a deep copy of a document or any nested structure
Types ¶
type AddOp ¶
type AddOp struct {
BaseOp
}
Addition node
func (*AddOp) Process ¶
func (n *AddOp) Process(inputs ...*DocumentZSet) (*DocumentZSet, error)
type AggregateInput ¶
type BaseOp ¶
type BaseOp struct {
// contains filtered or unexported fields
}
Base implementation for validation
type BinaryJoinOp ¶
type BinaryJoinOp struct { BaseOp // contains filtered or unexported fields }
Binary join
func NewBinaryJoin ¶
func NewBinaryJoin(eval Evaluator, inputs []string) *BinaryJoinOp
func (*BinaryJoinOp) HasZeroPreservationProperty ¶
func (n *BinaryJoinOp) HasZeroPreservationProperty() bool
func (*BinaryJoinOp) IsTimeInvariant ¶
func (n *BinaryJoinOp) IsTimeInvariant() bool
func (*BinaryJoinOp) OpType ¶
func (n *BinaryJoinOp) OpType() OperatorType
func (*BinaryJoinOp) Process ¶
func (n *BinaryJoinOp) Process(inputs ...*DocumentZSet) (*DocumentZSet, error)
type ChainGraph ¶
type ChainGraph struct {
// contains filtered or unexported fields
}
ChainGraph represents your specialized graph structure directly Always: [Inputs] -> [Optional N-ary Join] -> [Chain of Linear Ops] -> [Output]
func NewChainGraph ¶
func NewChainGraph() *ChainGraph
func (*ChainGraph) AddInput ¶
func (g *ChainGraph) AddInput(op *InputOp) string
AddInput adds an input node
func (*ChainGraph) AddToChain ¶
func (g *ChainGraph) AddToChain(op Operator) string
AddToChain adds an operation to the linear chain
func (*ChainGraph) Arity ¶
func (g *ChainGraph) Arity() int
Arity returns the number of inputs of the chain.
func (*ChainGraph) GetChain ¶
func (g *ChainGraph) GetChain() []Operator
GetChain returns the operations along the linear chain.
func (*ChainGraph) GetInput ¶
func (g *ChainGraph) GetInput(name string) *InputOp
GetInput returns a named input of the chain.
func (*ChainGraph) GetStartNode ¶
func (g *ChainGraph) GetStartNode() string
GetStartNode returns the node where the linear chain begins
func (*ChainGraph) SetJoin ¶
func (g *ChainGraph) SetJoin(op Operator) string
SetJoin sets the join node (if multiple inputs)
func (*ChainGraph) String ¶
func (g *ChainGraph) String() string
String representation for debugging (horizontal layout)
func (*ChainGraph) Validate ¶
func (g *ChainGraph) Validate() error
Validate checks the graph structure
type ConstantOp ¶
type ConstantOp struct { BaseOp // contains filtered or unexported fields }
Constant node
func NewConstant ¶
func NewConstant(value *DocumentZSet, name string) *ConstantOp
func (*ConstantOp) Process ¶
func (n *ConstantOp) Process(inputs ...*DocumentZSet) (*DocumentZSet, error)
type DelayOp ¶
type DelayOp struct { BaseOp // contains filtered or unexported fields }
DelayOp implements the z^(-1) operator: delays stream by one timestep PROBLEMATIC: This is stateful and needs careful handling in the execution model
func (*DelayOp) HasZeroPreservationProperty ¶
func (*DelayOp) IsTimeInvariant ¶
func (*DelayOp) OpType ¶
func (n *DelayOp) OpType() OperatorType
func (*DelayOp) Process ¶
func (n *DelayOp) Process(inputs ...*DocumentZSet) (*DocumentZSet, error)
type DeltaZSet ¶
type DeltaZSet = map[string]*DocumentZSet
type DifferentiatorOp ¶
type DifferentiatorOp struct { BaseOp // contains filtered or unexported fields }
DifferentiatorOp implements the D operator: converts snapshots to deltas D(s)[t] = s[t] - s[t-1]
func NewDifferentiator ¶
func NewDifferentiator() *DifferentiatorOp
func (*DifferentiatorOp) HasZeroPreservationProperty ¶
func (n *DifferentiatorOp) HasZeroPreservationProperty() bool
func (*DifferentiatorOp) IsTimeInvariant ¶
func (n *DifferentiatorOp) IsTimeInvariant() bool
func (*DifferentiatorOp) OpType ¶
func (n *DifferentiatorOp) OpType() OperatorType
func (*DifferentiatorOp) Process ¶
func (n *DifferentiatorOp) Process(inputs ...*DocumentZSet) (*DocumentZSet, error)
func (*DifferentiatorOp) Reset ¶
func (n *DifferentiatorOp) Reset()
Reset state (useful for testing or restarting computation)
type DistinctOp ¶
type DistinctOp struct {
BaseOp
}
Distinct node
func NewDistinct ¶
func NewDistinct() *DistinctOp
func (*DistinctOp) HasZeroPreservationProperty ¶
func (n *DistinctOp) HasZeroPreservationProperty() bool
func (*DistinctOp) IsTimeInvariant ¶
func (n *DistinctOp) IsTimeInvariant() bool
func (*DistinctOp) OpType ¶
func (n *DistinctOp) OpType() OperatorType
func (*DistinctOp) Process ¶
func (n *DistinctOp) Process(inputs ...*DocumentZSet) (*DocumentZSet, error)
type DistinctOptimizationRule ¶
type DistinctOptimizationRule struct{}
Rule 4: Optimize distinct operations
func (*DistinctOptimizationRule) Apply ¶
func (r *DistinctOptimizationRule) Apply(graph *ChainGraph) error
func (*DistinctOptimizationRule) CanApply ¶
func (r *DistinctOptimizationRule) CanApply(graph *ChainGraph) bool
func (*DistinctOptimizationRule) Name ¶
func (r *DistinctOptimizationRule) Name() string
type Document ¶
Document represents an unstructured document as map[string]any Can contain embedded maps, slices, and primitives (int64, float64, string, bool)
func DeepCopyDocument ¶
DeepCopyDocument creates a deep copy of a document.
type DocumentEntry ¶
DocumentEntry represents a document with its multiplicity in a Z-set.
type DocumentZSet ¶
type DocumentZSet struct {
// contains filtered or unexported fields
}
DocumentZSet implements Z-sets for atomic documents Documents are treated as opaque units - no internal structure operations
func FromDocuments ¶
func FromDocuments(docs []Document) (*DocumentZSet, error)
FromDocuments creates a Z-set from a slice of documents (each with multiplicity 1)
func NewDocumentZSet ¶
func NewDocumentZSet() *DocumentZSet
NewDocumentZSet creates an empty DocumentZSet
func SingletonZSet ¶
func SingletonZSet(doc Document) (*DocumentZSet, error)
SingletonZSet creates a Z-set containing a single document with multiplicity 1
func (*DocumentZSet) Add ¶
func (dz *DocumentZSet) Add(other *DocumentZSet) (*DocumentZSet, error)
Add performs Z-set addition (union with multiplicity)
func (*DocumentZSet) AddDocument ¶
func (dz *DocumentZSet) AddDocument(doc Document, count int) (*DocumentZSet, error)
AddDocument adds a document to the ZSet with given multiplicity by creating a new ZSet. This is the core operation for building Z-sets.
func (*DocumentZSet) AddDocumentMutate ¶
func (dz *DocumentZSet) AddDocumentMutate(doc Document, count int) error
AddDocumentMutate adds a document to the ZSet with given multiplicity by modifying the Zset in place.
func (*DocumentZSet) Contains ¶
func (dz *DocumentZSet) Contains(doc Document) (bool, error)
Contains checks if a document exists in the Z-set (with positive multiplicity)
func (*DocumentZSet) DeepCopy ¶
func (dz *DocumentZSet) DeepCopy() *DocumentZSet
DeepCopy creates a deep copy of the DocumentZSet
func (*DocumentZSet) Distinct ¶
func (dz *DocumentZSet) Distinct() (*DocumentZSet, error)
Distinct converts Z-set to set semantics (all multiplicities become 1) This is crucial for converting from multiset to set semantics
func (*DocumentZSet) GetDocuments ¶
func (dz *DocumentZSet) GetDocuments() ([]Document, error)
GetDocuments returns all documents as a slice (with multiplicities) Documents with multiplicity n appear n times in the result
func (*DocumentZSet) GetMultiplicity ¶
func (dz *DocumentZSet) GetMultiplicity(doc Document) (int, error)
GetMultiplicity returns the multiplicity of a specific document
func (*DocumentZSet) GetUniqueDocuments ¶
func (dz *DocumentZSet) GetUniqueDocuments() ([]Document, error)
GetUniqueDocuments returns all unique documents (ignoring multiplicities)
func (*DocumentZSet) IsZero ¶
func (dz *DocumentZSet) IsZero() bool
IsZero checks if the Z-set is empty (no documents with positive multiplicity)
func (*DocumentZSet) List ¶
func (dz *DocumentZSet) List() ([]DocumentEntry, error)
List returns all documents with their multiplicities (including negative ones).
func (*DocumentZSet) ShallowCopy ¶
func (dz *DocumentZSet) ShallowCopy() *DocumentZSet
ShallowCopy creates a shallow copy of the DocumentZSet. Safe because documents are treated as immutable atomic units.
func (*DocumentZSet) Size ¶
func (dz *DocumentZSet) Size() int
Size returns the number of documents counting only positive multiplicities.
func (*DocumentZSet) String ¶
func (dz *DocumentZSet) String() string
String returns a string representation of the Z-set for debugging
func (*DocumentZSet) Subtract ¶
func (dz *DocumentZSet) Subtract(other *DocumentZSet) (*DocumentZSet, error)
Subtract performs Z-set subtraction
func (*DocumentZSet) TotalSize ¶
func (dz *DocumentZSet) TotalSize() int
TotalSize returns the total number of documents, counting both positive and negative multiplicities.
func (*DocumentZSet) Unique ¶
func (dz *DocumentZSet) Unique() (*DocumentZSet, error)
Unique converts Z-set to set semantics preserving multiplicity sign (all multiplicities become +/-1)
func (*DocumentZSet) UniqueCount ¶
func (dz *DocumentZSet) UniqueCount() int
UniqueCount returns number of unique documents (ignoring multiplicities)
type Executor ¶
type Executor struct {
// contains filtered or unexported fields
}
Executor executes incremental queries on the specialized linear chain graph
func NewExecutor ¶
func NewExecutor(graph *ChainGraph, log logr.Logger) (*Executor, error)
func (*Executor) GetExecutionPlan ¶
GetExecutionPlan returns a human-readable execution plan
func (*Executor) GetNodeResult ¶
func (e *Executor) GetNodeResult(nodeID string, deltaInputs map[string]*DocumentZSet) (*DocumentZSet, error)
GetNodeResult returns intermediate results for debugging (optional caching)
func (*Executor) ProcessDelta ¶
func (e *Executor) ProcessDelta(deltaInputs DeltaZSet) (*DocumentZSet, error)
ProcessDelta processes one delta input and produces delta output This is the core incremental execution method
type FusedOp ¶
type FusedOp struct { BaseOp // contains filtered or unexported fields }
FusedOp is a naive fused op that just calls the subsequent nodes process function along the chain. Currently unused.
func (*FusedOp) HasZeroPreservationProperty ¶
func (*FusedOp) IsTimeInvariant ¶
func (*FusedOp) OpType ¶
func (n *FusedOp) OpType() OperatorType
func (*FusedOp) Process ¶
func (n *FusedOp) Process(inputs ...*DocumentZSet) (*DocumentZSet, error)
type GatherOp ¶
type GatherOp struct { BaseOp // contains filtered or unexported fields }
Snapshot Gather Operation (stateless)
func NewGather ¶
func NewGather(keyExtractor, valueExtractor Extractor, aggregator Transformer) *GatherOp
func (*GatherOp) HasZeroPreservationProperty ¶
func (*GatherOp) IsTimeInvariant ¶
func (*GatherOp) OpType ¶
func (op *GatherOp) OpType() OperatorType
func (*GatherOp) Process ¶
func (op *GatherOp) Process(inputs ...*DocumentZSet) (*DocumentZSet, error)
GatherOp processes each group and preserves input document content.
type IncrementalBinaryJoinOp ¶
type IncrementalBinaryJoinOp struct { BaseOp // contains filtered or unexported fields }
New stateful incremental join that composes 3 snapshot joins
func NewIncrementalBinaryJoin ¶
func NewIncrementalBinaryJoin(eval Evaluator, inputs []string) *IncrementalBinaryJoinOp
func (*IncrementalBinaryJoinOp) HasZeroPreservationProperty ¶
func (op *IncrementalBinaryJoinOp) HasZeroPreservationProperty() bool
func (*IncrementalBinaryJoinOp) IsTimeInvariant ¶
func (op *IncrementalBinaryJoinOp) IsTimeInvariant() bool
func (*IncrementalBinaryJoinOp) OpType ¶
func (op *IncrementalBinaryJoinOp) OpType() OperatorType
func (*IncrementalBinaryJoinOp) Process ¶
func (op *IncrementalBinaryJoinOp) Process(inputs ...*DocumentZSet) (*DocumentZSet, error)
func (*IncrementalBinaryJoinOp) Reset ¶
func (op *IncrementalBinaryJoinOp) Reset()
Reset method for testing
type IncrementalExecutionContext ¶
type IncrementalExecutionContext struct {
// contains filtered or unexported fields
}
IncrementalExecutionContext helps track incremental execution state
func NewIncrementalExecutionContext ¶
func NewIncrementalExecutionContext(executor *Executor) *IncrementalExecutionContext
func (*IncrementalExecutionContext) GetCumulativeOutput ¶
func (ctx *IncrementalExecutionContext) GetCumulativeOutput() *DocumentZSet
GetCumulativeOutput returns the current cumulative output
func (*IncrementalExecutionContext) ProcessDelta ¶
func (ctx *IncrementalExecutionContext) ProcessDelta(deltaInputs map[string]*DocumentZSet) (*DocumentZSet, error)
ProcessDelta processes one delta and updates cumulative state
func (*IncrementalExecutionContext) Reset ¶
func (ctx *IncrementalExecutionContext) Reset()
Reset the context for a fresh start
type IncrementalGatherOp ¶
type IncrementalGatherOp struct { BaseOp // contains filtered or unexported fields }
Incremental Gather Operation (stateful) Implements optimized gather^Δ with O(|delta|) complexity Incremental Gather Operation (stateful)
func NewIncrementalGather ¶
func NewIncrementalGather(keyExtractor, valueExtractor Extractor, aggregator Transformer) *IncrementalGatherOp
func (*IncrementalGatherOp) HasZeroPreservationProperty ¶
func (op *IncrementalGatherOp) HasZeroPreservationProperty() bool
func (*IncrementalGatherOp) IsTimeInvariant ¶
func (op *IncrementalGatherOp) IsTimeInvariant() bool
func (*IncrementalGatherOp) OpType ¶
func (op *IncrementalGatherOp) OpType() OperatorType
func (*IncrementalGatherOp) Process ¶
func (op *IncrementalGatherOp) Process(inputs ...*DocumentZSet) (*DocumentZSet, error)
IncrementalGatherOp processes deltas and preserves input document content.
type IncrementalJoinOp ¶
type IncrementalJoinOp struct { BaseOp // contains filtered or unexported fields }
func NewIncrementalJoin ¶
func NewIncrementalJoin(eval Evaluator, inputs []string) *IncrementalJoinOp
func (*IncrementalJoinOp) HasZeroPreservationProperty ¶
func (op *IncrementalJoinOp) HasZeroPreservationProperty() bool
func (*IncrementalJoinOp) IsTimeInvariant ¶
func (op *IncrementalJoinOp) IsTimeInvariant() bool
func (*IncrementalJoinOp) OpType ¶
func (op *IncrementalJoinOp) OpType() OperatorType
func (*IncrementalJoinOp) Process ¶
func (op *IncrementalJoinOp) Process(inputs ...*DocumentZSet) (*DocumentZSet, error)
type InputOp ¶
type InputOp struct { BaseOp // contains filtered or unexported fields }
Input node (source of data)
func (*InputOp) HasZeroPreservationProperty ¶
func (*InputOp) IsTimeInvariant ¶
func (*InputOp) OpType ¶
func (n *InputOp) OpType() OperatorType
func (*InputOp) Process ¶
func (n *InputOp) Process(inputs ...*DocumentZSet) (*DocumentZSet, error)
func (*InputOp) SetData ¶
func (n *InputOp) SetData(data *DocumentZSet)
type IntegrationDifferentiationEliminationRule ¶
type IntegrationDifferentiationEliminationRule struct{}
Rule 3: Remove I->D pairs (they cancel out)
func (*IntegrationDifferentiationEliminationRule) Apply ¶
func (r *IntegrationDifferentiationEliminationRule) Apply(graph *ChainGraph) error
func (*IntegrationDifferentiationEliminationRule) CanApply ¶
func (r *IntegrationDifferentiationEliminationRule) CanApply(graph *ChainGraph) bool
func (*IntegrationDifferentiationEliminationRule) Name ¶
func (r *IntegrationDifferentiationEliminationRule) Name() string
type IntegratorOp ¶
type IntegratorOp struct { BaseOp // contains filtered or unexported fields }
IntegratorOp implements the I operator: converts deltas to snapshots I(s)[t] = Σ(i=0 to t) s[i]
func NewIntegrator ¶
func NewIntegrator() *IntegratorOp
func (*IntegratorOp) HasZeroPreservationProperty ¶
func (n *IntegratorOp) HasZeroPreservationProperty() bool
func (*IntegratorOp) IsTimeInvariant ¶
func (n *IntegratorOp) IsTimeInvariant() bool
func (*IntegratorOp) OpType ¶
func (n *IntegratorOp) OpType() OperatorType
func (*IntegratorOp) Process ¶
func (n *IntegratorOp) Process(inputs ...*DocumentZSet) (*DocumentZSet, error)
func (*IntegratorOp) Reset ¶
func (n *IntegratorOp) Reset()
Reset state (useful for testing or restarting computation)
type JoinIncrementalizationRule ¶
type JoinIncrementalizationRule struct{}
Rule 1: Convert N-ary join to incremental version
func (*JoinIncrementalizationRule) Apply ¶
func (r *JoinIncrementalizationRule) Apply(graph *ChainGraph) error
func (*JoinIncrementalizationRule) CanApply ¶
func (r *JoinIncrementalizationRule) CanApply(graph *ChainGraph) bool
func (*JoinIncrementalizationRule) Name ¶
func (r *JoinIncrementalizationRule) Name() string
type JoinOp ¶
type JoinOp struct { BaseOp // contains filtered or unexported fields }
Snapshot N-ary join (non-incremental)
func (*JoinOp) HasZeroPreservationProperty ¶
func (*JoinOp) IsTimeInvariant ¶
func (*JoinOp) OpType ¶
func (op *JoinOp) OpType() OperatorType
func (*JoinOp) Process ¶
func (op *JoinOp) Process(inputs ...*DocumentZSet) (*DocumentZSet, error)
type LinearChainIncrementalizationRule ¶
type LinearChainIncrementalizationRule struct{}
Rule 2: Incrementalize the entire linear chain
func (*LinearChainIncrementalizationRule) Apply ¶
func (r *LinearChainIncrementalizationRule) Apply(graph *ChainGraph) error
func (*LinearChainIncrementalizationRule) CanApply ¶
func (r *LinearChainIncrementalizationRule) CanApply(graph *ChainGraph) bool
func (*LinearChainIncrementalizationRule) Name ¶
func (r *LinearChainIncrementalizationRule) Name() string
type LinearChainRewriteEngine ¶
type LinearChainRewriteEngine struct {
// contains filtered or unexported fields
}
LinearChainRewriteEngine works directly on LinearChainGraph
func NewLinearChainRewriteEngine ¶
func NewLinearChainRewriteEngine() *LinearChainRewriteEngine
func (*LinearChainRewriteEngine) AddRule ¶
func (re *LinearChainRewriteEngine) AddRule(rule LinearChainRule)
func (*LinearChainRewriteEngine) Optimize ¶
func (re *LinearChainRewriteEngine) Optimize(graph *ChainGraph) error
Optimize applies all rules until fixpoint
type LinearChainRule ¶
type LinearChainRule interface { Name() string CanApply(graph *ChainGraph) bool Apply(graph *ChainGraph) error }
type LinearOperatorFusionRule ¶
type LinearOperatorFusionRule struct{}
Rule 5: Fuse adjacent linear operations for efficiency
func (*LinearOperatorFusionRule) Apply ¶
func (r *LinearOperatorFusionRule) Apply(graph *ChainGraph) error
func (*LinearOperatorFusionRule) CanApply ¶
func (r *LinearOperatorFusionRule) CanApply(graph *ChainGraph) bool
func (*LinearOperatorFusionRule) Name ¶
func (r *LinearOperatorFusionRule) Name() string
type NegateOp ¶
type NegateOp struct {
BaseOp
}
Negation node
func (*NegateOp) Process ¶
func (n *NegateOp) Process(inputs ...*DocumentZSet) (*DocumentZSet, error)
type Operator ¶
type Operator interface { // Process input ZSets and produce output ZSet Process(inputs ...*DocumentZSet) (*DocumentZSet, error) // Get input arity (number of inputs expected) Arity() int // Critical for rewrite engine OpType() OperatorType // For incremental transformation IsTimeInvariant() bool HasZeroPreservationProperty() bool // contains filtered or unexported methods }
Operator represents a computation node in the graph
func FuseFilterProject ¶
func FuseFilterProject(filter *SelectionOp, project *ProjectionOp) (Operator, error)
Helper function for safe fusion
func IncrementalizeOp ¶
IncrementalizeOp converts a "snapshot" operator into an incremental operator and returns a boolean to signal whether the conversion was successful.
type OperatorType ¶
type OperatorType int
OperatorType classifies operators for rewrite rules
const ( OpTypeLinear OperatorType = iota // Op^Δ = Op OpTypeBilinear // Op^Δ needs expansion (like joins) OpTypeNonLinear // Op^Δ needs special handling (like distinct) OpTypeStructural // Graph structure (add, subtract, etc.) )
type ProjectThenSelectOp ¶
type ProjectThenSelectOp struct { BaseOp // contains filtered or unexported fields }
ProjectThenSelectOp - True fusion: project then select in single pass
func NewProjectThenSelect ¶
func NewProjectThenSelect(projEval, selEval Evaluator) *ProjectThenSelectOp
func (*ProjectThenSelectOp) HasZeroPreservationProperty ¶
func (op *ProjectThenSelectOp) HasZeroPreservationProperty() bool
func (*ProjectThenSelectOp) IsTimeInvariant ¶
func (op *ProjectThenSelectOp) IsTimeInvariant() bool
func (*ProjectThenSelectOp) OpType ¶
func (op *ProjectThenSelectOp) OpType() OperatorType
func (*ProjectThenSelectOp) Process ¶
func (op *ProjectThenSelectOp) Process(inputs ...*DocumentZSet) (*DocumentZSet, error)
type ProjectionOp ¶
type ProjectionOp struct { BaseOp // contains filtered or unexported fields }
Projection node
func NewProjection ¶
func NewProjection(eval Evaluator) *ProjectionOp
func (*ProjectionOp) HasZeroPreservationProperty ¶
func (n *ProjectionOp) HasZeroPreservationProperty() bool
func (*ProjectionOp) IsTimeInvariant ¶
func (n *ProjectionOp) IsTimeInvariant() bool
func (*ProjectionOp) OpType ¶
func (n *ProjectionOp) OpType() OperatorType
func (*ProjectionOp) Process ¶
func (n *ProjectionOp) Process(inputs ...*DocumentZSet) (*DocumentZSet, error)
type SelectThenProjectionsOp ¶
type SelectThenProjectionsOp struct { BaseOp // contains filtered or unexported fields }
SelectThenProjectionsOp - optional selection + 1+ projections
func NewSelectThenProjections ¶
func NewSelectThenProjections(selEval Evaluator, projEvals []Evaluator) *SelectThenProjectionsOp
func (*SelectThenProjectionsOp) HasZeroPreservationProperty ¶
func (op *SelectThenProjectionsOp) HasZeroPreservationProperty() bool
func (*SelectThenProjectionsOp) IsTimeInvariant ¶
func (op *SelectThenProjectionsOp) IsTimeInvariant() bool
func (*SelectThenProjectionsOp) OpType ¶
func (op *SelectThenProjectionsOp) OpType() OperatorType
func (*SelectThenProjectionsOp) Process ¶
func (op *SelectThenProjectionsOp) Process(inputs ...*DocumentZSet) (*DocumentZSet, error)
type SelectionOp ¶
type SelectionOp struct { BaseOp // contains filtered or unexported fields }
Selection node
func NewSelection ¶
func NewSelection(eval Evaluator) *SelectionOp
func (*SelectionOp) HasZeroPreservationProperty ¶
func (n *SelectionOp) HasZeroPreservationProperty() bool
func (*SelectionOp) IsTimeInvariant ¶
func (n *SelectionOp) IsTimeInvariant() bool
func (*SelectionOp) OpType ¶
func (n *SelectionOp) OpType() OperatorType
func (*SelectionOp) Process ¶
func (n *SelectionOp) Process(inputs ...*DocumentZSet) (*DocumentZSet, error)
type SubtractOp ¶
type SubtractOp struct {
BaseOp
}
Subtraction node
func NewSubtract ¶
func NewSubtract() *SubtractOp
func (*SubtractOp) Process ¶
func (n *SubtractOp) Process(inputs ...*DocumentZSet) (*DocumentZSet, error)
type Transformer ¶
Transform documents by setting/replacing fields.
type UnwindOp ¶
type UnwindOp struct { BaseOp // contains filtered or unexported fields }
UnwindOp flattens arrays within documents.
func NewUnwind ¶
func NewUnwind(arrayExtractor Extractor, transformer Transformer) *UnwindOp
func (*UnwindOp) HasZeroPreservationProperty ¶
func (*UnwindOp) IsTimeInvariant ¶
func (*UnwindOp) OpType ¶
func (op *UnwindOp) OpType() OperatorType
func (*UnwindOp) Process ¶
func (op *UnwindOp) Process(inputs ...*DocumentZSet) (*DocumentZSet, error)