dbsp

package
v0.0.0-...-4ce99f6 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jul 21, 2025 License: Apache-2.0 Imports: 4 Imported by: 0

Documentation

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

func DeepCopyAny

func DeepCopyAny(val any) any

DeepCopyAny creates a deep copy of a document or any nested structure

func DeepEqual

func DeepEqual(a, b Document) (bool, error)

DeepEqual checks if two documents are equal using JSON comparison

Types

type AddOp

type AddOp struct {
	BaseOp
}

Addition node

func NewAdd

func NewAdd() *AddOp

func (*AddOp) Process

func (n *AddOp) Process(inputs ...*DocumentZSet) (*DocumentZSet, error)

type AggregateInput

type AggregateInput struct {
	Key    any
	Values []any
}

type BaseOp

type BaseOp struct {
	// contains filtered or unexported fields
}

Base implementation for validation

func NewBaseOp

func NewBaseOp(name string, arity int) BaseOp

func (*BaseOp) Arity

func (n *BaseOp) Arity() int

type BinaryJoinOp

type BinaryJoinOp struct {
	BaseOp
	// contains filtered or unexported fields
}

Binary join

func NewBinaryJoin

func NewBinaryJoin(eval Evaluator, inputs []string) *BinaryJoinOp

func (*BinaryJoinOp) HasZeroPreservationProperty

func (n *BinaryJoinOp) HasZeroPreservationProperty() bool

func (*BinaryJoinOp) IsTimeInvariant

func (n *BinaryJoinOp) IsTimeInvariant() bool

func (*BinaryJoinOp) OpType

func (n *BinaryJoinOp) OpType() OperatorType

func (*BinaryJoinOp) Process

func (n *BinaryJoinOp) Process(inputs ...*DocumentZSet) (*DocumentZSet, error)

type ChainGraph

type ChainGraph struct {
	// contains filtered or unexported fields
}

ChainGraph represents your specialized graph structure directly Always: [Inputs] -> [Optional N-ary Join] -> [Chain of Linear Ops] -> [Output]

func NewChainGraph

func NewChainGraph() *ChainGraph

func (*ChainGraph) AddInput

func (g *ChainGraph) AddInput(op *InputOp) string

AddInput adds an input node

func (*ChainGraph) AddToChain

func (g *ChainGraph) AddToChain(op Operator) string

AddToChain adds an operation to the linear chain

func (*ChainGraph) Arity

func (g *ChainGraph) Arity() int

Arity returns the number of inputs of the chain.

func (*ChainGraph) GetChain

func (g *ChainGraph) GetChain() []Operator

GetChain returns the operations along the linear chain.

func (*ChainGraph) GetInput

func (g *ChainGraph) GetInput(name string) *InputOp

GetInput returns a named input of the chain.

func (*ChainGraph) GetStartNode

func (g *ChainGraph) GetStartNode() string

GetStartNode returns the node where the linear chain begins

func (*ChainGraph) SetJoin

func (g *ChainGraph) SetJoin(op Operator) string

SetJoin sets the join node (if multiple inputs)

func (*ChainGraph) String

func (g *ChainGraph) String() string

String representation for debugging (horizontal layout)

func (*ChainGraph) Validate

func (g *ChainGraph) Validate() error

Validate checks the graph structure

type ConstantOp

type ConstantOp struct {
	BaseOp
	// contains filtered or unexported fields
}

Constant node

func NewConstant

func NewConstant(value *DocumentZSet, name string) *ConstantOp

func (*ConstantOp) Process

func (n *ConstantOp) Process(inputs ...*DocumentZSet) (*DocumentZSet, error)

type DelayOp

type DelayOp struct {
	BaseOp
	// contains filtered or unexported fields
}

DelayOp implements the z^(-1) operator: delays stream by one timestep PROBLEMATIC: This is stateful and needs careful handling in the execution model

func NewDelay

func NewDelay() *DelayOp

func (*DelayOp) HasZeroPreservationProperty

func (n *DelayOp) HasZeroPreservationProperty() bool

func (*DelayOp) IsTimeInvariant

func (n *DelayOp) IsTimeInvariant() bool

func (*DelayOp) OpType

func (n *DelayOp) OpType() OperatorType

func (*DelayOp) Process

func (n *DelayOp) Process(inputs ...*DocumentZSet) (*DocumentZSet, error)

func (*DelayOp) Reset

func (n *DelayOp) Reset()

type DeltaZSet

type DeltaZSet = map[string]*DocumentZSet

type DifferentiatorOp

type DifferentiatorOp struct {
	BaseOp
	// contains filtered or unexported fields
}

DifferentiatorOp implements the D operator: converts snapshots to deltas D(s)[t] = s[t] - s[t-1]

func NewDifferentiator

func NewDifferentiator() *DifferentiatorOp

func (*DifferentiatorOp) HasZeroPreservationProperty

func (n *DifferentiatorOp) HasZeroPreservationProperty() bool

func (*DifferentiatorOp) IsTimeInvariant

func (n *DifferentiatorOp) IsTimeInvariant() bool

func (*DifferentiatorOp) OpType

func (n *DifferentiatorOp) OpType() OperatorType

func (*DifferentiatorOp) Process

func (n *DifferentiatorOp) Process(inputs ...*DocumentZSet) (*DocumentZSet, error)

func (*DifferentiatorOp) Reset

func (n *DifferentiatorOp) Reset()

Reset state (useful for testing or restarting computation)

type DistinctOp

type DistinctOp struct {
	BaseOp
}

Distinct node

func NewDistinct

func NewDistinct() *DistinctOp

func (*DistinctOp) HasZeroPreservationProperty

func (n *DistinctOp) HasZeroPreservationProperty() bool

func (*DistinctOp) IsTimeInvariant

func (n *DistinctOp) IsTimeInvariant() bool

func (*DistinctOp) OpType

func (n *DistinctOp) OpType() OperatorType

func (*DistinctOp) Process

func (n *DistinctOp) Process(inputs ...*DocumentZSet) (*DocumentZSet, error)

type DistinctOptimizationRule

type DistinctOptimizationRule struct{}

Rule 4: Optimize distinct operations

func (*DistinctOptimizationRule) Apply

func (r *DistinctOptimizationRule) Apply(graph *ChainGraph) error

func (*DistinctOptimizationRule) CanApply

func (r *DistinctOptimizationRule) CanApply(graph *ChainGraph) bool

func (*DistinctOptimizationRule) Name

func (r *DistinctOptimizationRule) Name() string

type Document

type Document = map[string]any

Document represents an unstructured document as map[string]any Can contain embedded maps, slices, and primitives (int64, float64, string, bool)

func DeepCopyDocument

func DeepCopyDocument(val any) Document

DeepCopyDocument creates a deep copy of a document.

func NewDocument

func NewDocument() Document

Helper functions for creating documents and Z-sets

type DocumentEntry

type DocumentEntry struct {
	Document     Document
	Multiplicity int
}

DocumentEntry represents a document with its multiplicity in a Z-set.

type DocumentZSet

type DocumentZSet struct {
	// contains filtered or unexported fields
}

DocumentZSet implements Z-sets for atomic documents Documents are treated as opaque units - no internal structure operations

func FromDocuments

func FromDocuments(docs []Document) (*DocumentZSet, error)

FromDocuments creates a Z-set from a slice of documents (each with multiplicity 1)

func NewDocumentZSet

func NewDocumentZSet() *DocumentZSet

NewDocumentZSet creates an empty DocumentZSet

func SingletonZSet

func SingletonZSet(doc Document) (*DocumentZSet, error)

SingletonZSet creates a Z-set containing a single document with multiplicity 1

func (*DocumentZSet) Add

func (dz *DocumentZSet) Add(other *DocumentZSet) (*DocumentZSet, error)

Add performs Z-set addition (union with multiplicity)

func (*DocumentZSet) AddDocument

func (dz *DocumentZSet) AddDocument(doc Document, count int) (*DocumentZSet, error)

AddDocument adds a document to the ZSet with given multiplicity by creating a new ZSet. This is the core operation for building Z-sets.

func (*DocumentZSet) AddDocumentMutate

func (dz *DocumentZSet) AddDocumentMutate(doc Document, count int) error

AddDocumentMutate adds a document to the ZSet with given multiplicity by modifying the Zset in place.

func (*DocumentZSet) Contains

func (dz *DocumentZSet) Contains(doc Document) (bool, error)

Contains checks if a document exists in the Z-set (with positive multiplicity)

func (*DocumentZSet) DeepCopy

func (dz *DocumentZSet) DeepCopy() *DocumentZSet

DeepCopy creates a deep copy of the DocumentZSet

func (*DocumentZSet) Distinct

func (dz *DocumentZSet) Distinct() (*DocumentZSet, error)

Distinct converts Z-set to set semantics (all multiplicities become 1) This is crucial for converting from multiset to set semantics

func (*DocumentZSet) GetDocuments

func (dz *DocumentZSet) GetDocuments() ([]Document, error)

GetDocuments returns all documents as a slice (with multiplicities) Documents with multiplicity n appear n times in the result

func (*DocumentZSet) GetMultiplicity

func (dz *DocumentZSet) GetMultiplicity(doc Document) (int, error)

GetMultiplicity returns the multiplicity of a specific document

func (*DocumentZSet) GetUniqueDocuments

func (dz *DocumentZSet) GetUniqueDocuments() ([]Document, error)

GetUniqueDocuments returns all unique documents (ignoring multiplicities)

func (*DocumentZSet) IsZero

func (dz *DocumentZSet) IsZero() bool

IsZero checks if the Z-set is empty (no documents with positive multiplicity)

func (*DocumentZSet) List

func (dz *DocumentZSet) List() ([]DocumentEntry, error)

List returns all documents with their multiplicities (including negative ones).

func (*DocumentZSet) ShallowCopy

func (dz *DocumentZSet) ShallowCopy() *DocumentZSet

ShallowCopy creates a shallow copy of the DocumentZSet. Safe because documents are treated as immutable atomic units.

func (*DocumentZSet) Size

func (dz *DocumentZSet) Size() int

Size returns the number of documents counting only positive multiplicities.

func (*DocumentZSet) String

func (dz *DocumentZSet) String() string

String returns a string representation of the Z-set for debugging

func (*DocumentZSet) Subtract

func (dz *DocumentZSet) Subtract(other *DocumentZSet) (*DocumentZSet, error)

Subtract performs Z-set subtraction

func (*DocumentZSet) TotalSize

func (dz *DocumentZSet) TotalSize() int

TotalSize returns the total number of documents, counting both positive and negative multiplicities.

func (*DocumentZSet) Unique

func (dz *DocumentZSet) Unique() (*DocumentZSet, error)

Unique converts Z-set to set semantics preserving multiplicity sign (all multiplicities become +/-1)

func (*DocumentZSet) UniqueCount

func (dz *DocumentZSet) UniqueCount() int

UniqueCount returns number of unique documents (ignoring multiplicities)

type Evaluator

type Evaluator interface {
	Evaluate(Document) ([]Document, error)
	fmt.Stringer
}

Document transformation (current use).

type Executor

type Executor struct {
	// contains filtered or unexported fields
}

Executor executes incremental queries on the specialized linear chain graph

func NewExecutor

func NewExecutor(graph *ChainGraph, log logr.Logger) (*Executor, error)

func (*Executor) GetExecutionPlan

func (e *Executor) GetExecutionPlan() string

GetExecutionPlan returns a human-readable execution plan

func (*Executor) GetNodeResult

func (e *Executor) GetNodeResult(nodeID string, deltaInputs map[string]*DocumentZSet) (*DocumentZSet, error)

GetNodeResult returns intermediate results for debugging (optional caching)

func (*Executor) ProcessDelta

func (e *Executor) ProcessDelta(deltaInputs DeltaZSet) (*DocumentZSet, error)

ProcessDelta processes one delta input and produces delta output This is the core incremental execution method

func (*Executor) Reset

func (e *Executor) Reset()

Reset all stateful nodes (for incremental computation)

type Extractor

type Extractor interface {
	Extract(Document) (any, error)
	fmt.Stringer
}

Extract values from documents.

type FusedOp

type FusedOp struct {
	BaseOp
	// contains filtered or unexported fields
}

FusedOp is a naive fused op that just calls the subsequent nodes process function along the chain. Currently unused.

func NewFusedOp

func NewFusedOp(nodes []Operator, name string) (*FusedOp, error)

func (*FusedOp) HasZeroPreservationProperty

func (n *FusedOp) HasZeroPreservationProperty() bool

func (*FusedOp) IsTimeInvariant

func (n *FusedOp) IsTimeInvariant() bool

func (*FusedOp) OpType

func (n *FusedOp) OpType() OperatorType

func (*FusedOp) Process

func (n *FusedOp) Process(inputs ...*DocumentZSet) (*DocumentZSet, error)

type GatherOp

type GatherOp struct {
	BaseOp
	// contains filtered or unexported fields
}

Snapshot Gather Operation (stateless)

func NewGather

func NewGather(keyExtractor, valueExtractor Extractor, aggregator Transformer) *GatherOp

func (*GatherOp) HasZeroPreservationProperty

func (op *GatherOp) HasZeroPreservationProperty() bool

func (*GatherOp) IsTimeInvariant

func (op *GatherOp) IsTimeInvariant() bool

func (*GatherOp) OpType

func (op *GatherOp) OpType() OperatorType

func (*GatherOp) Process

func (op *GatherOp) Process(inputs ...*DocumentZSet) (*DocumentZSet, error)

GatherOp processes each group and preserves input document content.

type GraphNode

type GraphNode struct {
	ID     string
	Op     Operator
	Inputs []*GraphNode
	Output *GraphNode
}

type GroupData

type GroupData struct {
	Key      any
	Values   []any
	Document Document
}

Helper structs

type IncrementalBinaryJoinOp

type IncrementalBinaryJoinOp struct {
	BaseOp
	// contains filtered or unexported fields
}

New stateful incremental join that composes 3 snapshot joins

func NewIncrementalBinaryJoin

func NewIncrementalBinaryJoin(eval Evaluator, inputs []string) *IncrementalBinaryJoinOp

func (*IncrementalBinaryJoinOp) HasZeroPreservationProperty

func (op *IncrementalBinaryJoinOp) HasZeroPreservationProperty() bool

func (*IncrementalBinaryJoinOp) IsTimeInvariant

func (op *IncrementalBinaryJoinOp) IsTimeInvariant() bool

func (*IncrementalBinaryJoinOp) OpType

func (*IncrementalBinaryJoinOp) Process

func (op *IncrementalBinaryJoinOp) Process(inputs ...*DocumentZSet) (*DocumentZSet, error)

func (*IncrementalBinaryJoinOp) Reset

func (op *IncrementalBinaryJoinOp) Reset()

Reset method for testing

type IncrementalExecutionContext

type IncrementalExecutionContext struct {
	// contains filtered or unexported fields
}

IncrementalExecutionContext helps track incremental execution state

func NewIncrementalExecutionContext

func NewIncrementalExecutionContext(executor *Executor) *IncrementalExecutionContext

func (*IncrementalExecutionContext) GetCumulativeOutput

func (ctx *IncrementalExecutionContext) GetCumulativeOutput() *DocumentZSet

GetCumulativeOutput returns the current cumulative output

func (*IncrementalExecutionContext) ProcessDelta

func (ctx *IncrementalExecutionContext) ProcessDelta(deltaInputs map[string]*DocumentZSet) (*DocumentZSet, error)

ProcessDelta processes one delta and updates cumulative state

func (*IncrementalExecutionContext) Reset

func (ctx *IncrementalExecutionContext) Reset()

Reset the context for a fresh start

type IncrementalGatherOp

type IncrementalGatherOp struct {
	BaseOp
	// contains filtered or unexported fields
}

Incremental Gather Operation (stateful) Implements optimized gather^Δ with O(|delta|) complexity Incremental Gather Operation (stateful)

func NewIncrementalGather

func NewIncrementalGather(keyExtractor, valueExtractor Extractor, aggregator Transformer) *IncrementalGatherOp

func (*IncrementalGatherOp) HasZeroPreservationProperty

func (op *IncrementalGatherOp) HasZeroPreservationProperty() bool

func (*IncrementalGatherOp) IsTimeInvariant

func (op *IncrementalGatherOp) IsTimeInvariant() bool

func (*IncrementalGatherOp) OpType

func (op *IncrementalGatherOp) OpType() OperatorType

func (*IncrementalGatherOp) Process

func (op *IncrementalGatherOp) Process(inputs ...*DocumentZSet) (*DocumentZSet, error)

IncrementalGatherOp processes deltas and preserves input document content.

func (*IncrementalGatherOp) Reset

func (op *IncrementalGatherOp) Reset()

Reset method for testing

type IncrementalJoinOp

type IncrementalJoinOp struct {
	BaseOp
	// contains filtered or unexported fields
}

func NewIncrementalJoin

func NewIncrementalJoin(eval Evaluator, inputs []string) *IncrementalJoinOp

func (*IncrementalJoinOp) HasZeroPreservationProperty

func (op *IncrementalJoinOp) HasZeroPreservationProperty() bool

func (*IncrementalJoinOp) IsTimeInvariant

func (op *IncrementalJoinOp) IsTimeInvariant() bool

func (*IncrementalJoinOp) OpType

func (op *IncrementalJoinOp) OpType() OperatorType

func (*IncrementalJoinOp) Process

func (op *IncrementalJoinOp) Process(inputs ...*DocumentZSet) (*DocumentZSet, error)

type InputOp

type InputOp struct {
	BaseOp
	// contains filtered or unexported fields
}

Input node (source of data)

func NewInput

func NewInput(name string) *InputOp

func (*InputOp) HasZeroPreservationProperty

func (n *InputOp) HasZeroPreservationProperty() bool

func (*InputOp) IsTimeInvariant

func (n *InputOp) IsTimeInvariant() bool

func (*InputOp) Name

func (n *InputOp) Name() string

func (*InputOp) OpType

func (n *InputOp) OpType() OperatorType

func (*InputOp) Process

func (n *InputOp) Process(inputs ...*DocumentZSet) (*DocumentZSet, error)

func (*InputOp) SetData

func (n *InputOp) SetData(data *DocumentZSet)

type IntegrationDifferentiationEliminationRule

type IntegrationDifferentiationEliminationRule struct{}

Rule 3: Remove I->D pairs (they cancel out)

func (*IntegrationDifferentiationEliminationRule) Apply

func (*IntegrationDifferentiationEliminationRule) CanApply

func (*IntegrationDifferentiationEliminationRule) Name

type IntegratorOp

type IntegratorOp struct {
	BaseOp
	// contains filtered or unexported fields
}

IntegratorOp implements the I operator: converts deltas to snapshots I(s)[t] = Σ(i=0 to t) s[i]

func NewIntegrator

func NewIntegrator() *IntegratorOp

func (*IntegratorOp) HasZeroPreservationProperty

func (n *IntegratorOp) HasZeroPreservationProperty() bool

func (*IntegratorOp) IsTimeInvariant

func (n *IntegratorOp) IsTimeInvariant() bool

func (*IntegratorOp) OpType

func (n *IntegratorOp) OpType() OperatorType

func (*IntegratorOp) Process

func (n *IntegratorOp) Process(inputs ...*DocumentZSet) (*DocumentZSet, error)

func (*IntegratorOp) Reset

func (n *IntegratorOp) Reset()

Reset state (useful for testing or restarting computation)

type JoinIncrementalizationRule

type JoinIncrementalizationRule struct{}

Rule 1: Convert N-ary join to incremental version

func (*JoinIncrementalizationRule) Apply

func (r *JoinIncrementalizationRule) Apply(graph *ChainGraph) error

func (*JoinIncrementalizationRule) CanApply

func (r *JoinIncrementalizationRule) CanApply(graph *ChainGraph) bool

func (*JoinIncrementalizationRule) Name

type JoinOp

type JoinOp struct {
	BaseOp
	// contains filtered or unexported fields
}

Snapshot N-ary join (non-incremental)

func NewJoin

func NewJoin(eval Evaluator, inputs []string) *JoinOp

func (*JoinOp) HasZeroPreservationProperty

func (op *JoinOp) HasZeroPreservationProperty() bool

func (*JoinOp) IsTimeInvariant

func (op *JoinOp) IsTimeInvariant() bool

func (*JoinOp) OpType

func (op *JoinOp) OpType() OperatorType

func (*JoinOp) Process

func (op *JoinOp) Process(inputs ...*DocumentZSet) (*DocumentZSet, error)

type LinearChainIncrementalizationRule

type LinearChainIncrementalizationRule struct{}

Rule 2: Incrementalize the entire linear chain

func (*LinearChainIncrementalizationRule) Apply

func (*LinearChainIncrementalizationRule) CanApply

func (*LinearChainIncrementalizationRule) Name

type LinearChainRewriteEngine

type LinearChainRewriteEngine struct {
	// contains filtered or unexported fields
}

LinearChainRewriteEngine works directly on LinearChainGraph

func NewLinearChainRewriteEngine

func NewLinearChainRewriteEngine() *LinearChainRewriteEngine

func (*LinearChainRewriteEngine) AddRule

func (re *LinearChainRewriteEngine) AddRule(rule LinearChainRule)

func (*LinearChainRewriteEngine) Optimize

func (re *LinearChainRewriteEngine) Optimize(graph *ChainGraph) error

Optimize applies all rules until fixpoint

type LinearChainRule

type LinearChainRule interface {
	Name() string
	CanApply(graph *ChainGraph) bool
	Apply(graph *ChainGraph) error
}

type LinearOperatorFusionRule

type LinearOperatorFusionRule struct{}

Rule 5: Fuse adjacent linear operations for efficiency

func (*LinearOperatorFusionRule) Apply

func (r *LinearOperatorFusionRule) Apply(graph *ChainGraph) error

func (*LinearOperatorFusionRule) CanApply

func (r *LinearOperatorFusionRule) CanApply(graph *ChainGraph) bool

func (*LinearOperatorFusionRule) Name

func (r *LinearOperatorFusionRule) Name() string

type NegateOp

type NegateOp struct {
	BaseOp
}

Negation node

func NewNegate

func NewNegate() *NegateOp

func (*NegateOp) Process

func (n *NegateOp) Process(inputs ...*DocumentZSet) (*DocumentZSet, error)

type Operator

type Operator interface {
	// Process input ZSets and produce output ZSet
	Process(inputs ...*DocumentZSet) (*DocumentZSet, error)

	// Get input arity (number of inputs expected)
	Arity() int

	// Critical for rewrite engine
	OpType() OperatorType

	// For incremental transformation
	IsTimeInvariant() bool
	HasZeroPreservationProperty() bool
	// contains filtered or unexported methods
}

Operator represents a computation node in the graph

func FuseFilterProject

func FuseFilterProject(filter *SelectionOp, project *ProjectionOp) (Operator, error)

Helper function for safe fusion

func IncrementalizeOp

func IncrementalizeOp(in Operator) (Operator, bool)

IncrementalizeOp converts a "snapshot" operator into an incremental operator and returns a boolean to signal whether the conversion was successful.

type OperatorType

type OperatorType int

OperatorType classifies operators for rewrite rules

const (
	OpTypeLinear     OperatorType = iota // Op^Δ = Op
	OpTypeBilinear                       // Op^Δ needs expansion (like joins)
	OpTypeNonLinear                      // Op^Δ needs special handling (like distinct)
	OpTypeStructural                     // Graph structure (add, subtract, etc.)
)

type ProjectThenSelectOp

type ProjectThenSelectOp struct {
	BaseOp
	// contains filtered or unexported fields
}

ProjectThenSelectOp - True fusion: project then select in single pass

func NewProjectThenSelect

func NewProjectThenSelect(projEval, selEval Evaluator) *ProjectThenSelectOp

func (*ProjectThenSelectOp) HasZeroPreservationProperty

func (op *ProjectThenSelectOp) HasZeroPreservationProperty() bool

func (*ProjectThenSelectOp) IsTimeInvariant

func (op *ProjectThenSelectOp) IsTimeInvariant() bool

func (*ProjectThenSelectOp) OpType

func (op *ProjectThenSelectOp) OpType() OperatorType

func (*ProjectThenSelectOp) Process

func (op *ProjectThenSelectOp) Process(inputs ...*DocumentZSet) (*DocumentZSet, error)

type ProjectionOp

type ProjectionOp struct {
	BaseOp
	// contains filtered or unexported fields
}

Projection node

func NewProjection

func NewProjection(eval Evaluator) *ProjectionOp

func (*ProjectionOp) HasZeroPreservationProperty

func (n *ProjectionOp) HasZeroPreservationProperty() bool

func (*ProjectionOp) IsTimeInvariant

func (n *ProjectionOp) IsTimeInvariant() bool

func (*ProjectionOp) OpType

func (n *ProjectionOp) OpType() OperatorType

func (*ProjectionOp) Process

func (n *ProjectionOp) Process(inputs ...*DocumentZSet) (*DocumentZSet, error)

type SelectThenProjectionsOp

type SelectThenProjectionsOp struct {
	BaseOp
	// contains filtered or unexported fields
}

SelectThenProjectionsOp - optional selection + 1+ projections

func NewSelectThenProjections

func NewSelectThenProjections(selEval Evaluator, projEvals []Evaluator) *SelectThenProjectionsOp

func (*SelectThenProjectionsOp) HasZeroPreservationProperty

func (op *SelectThenProjectionsOp) HasZeroPreservationProperty() bool

func (*SelectThenProjectionsOp) IsTimeInvariant

func (op *SelectThenProjectionsOp) IsTimeInvariant() bool

func (*SelectThenProjectionsOp) OpType

func (*SelectThenProjectionsOp) Process

func (op *SelectThenProjectionsOp) Process(inputs ...*DocumentZSet) (*DocumentZSet, error)

type SelectionOp

type SelectionOp struct {
	BaseOp
	// contains filtered or unexported fields
}

Selection node

func NewSelection

func NewSelection(eval Evaluator) *SelectionOp

func (*SelectionOp) HasZeroPreservationProperty

func (n *SelectionOp) HasZeroPreservationProperty() bool

func (*SelectionOp) IsTimeInvariant

func (n *SelectionOp) IsTimeInvariant() bool

func (*SelectionOp) OpType

func (n *SelectionOp) OpType() OperatorType

func (*SelectionOp) Process

func (n *SelectionOp) Process(inputs ...*DocumentZSet) (*DocumentZSet, error)

type SubtractOp

type SubtractOp struct {
	BaseOp
}

Subtraction node

func NewSubtract

func NewSubtract() *SubtractOp

func (*SubtractOp) Process

func (n *SubtractOp) Process(inputs ...*DocumentZSet) (*DocumentZSet, error)

type Transformer

type Transformer interface {
	Transform(Document, any) (Document, error)
	fmt.Stringer
}

Transform documents by setting/replacing fields.

type UnwindOp

type UnwindOp struct {
	BaseOp
	// contains filtered or unexported fields
}

UnwindOp flattens arrays within documents.

func NewUnwind

func NewUnwind(arrayExtractor Extractor, transformer Transformer) *UnwindOp

func (*UnwindOp) HasZeroPreservationProperty

func (op *UnwindOp) HasZeroPreservationProperty() bool

func (*UnwindOp) IsTimeInvariant

func (op *UnwindOp) IsTimeInvariant() bool

func (*UnwindOp) OpType

func (op *UnwindOp) OpType() OperatorType

func (*UnwindOp) Process

func (op *UnwindOp) Process(inputs ...*DocumentZSet) (*DocumentZSet, error)

type ZSetError

type ZSetError struct {
	Message string
	Cause   error
}

Error type for better error handling

func (*ZSetError) Error

func (e *ZSetError) Error() string

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL