Documentation
¶
Index ¶
- Constants
- func ExplainLogical(plan *LogicalPlan) string
- type AggregateNode
- type Executor
- type FilterNode
- type JoinNode
- type LimitNode
- type LogicalNode
- type LogicalPlan
- type PhysAggregate
- type PhysFilter
- type PhysJoin
- type PhysLimit
- type PhysProject
- type PhysScan
- type PhysSort
- type PhysWithColumn
- type PhysicalNode
- type PhysicalPlan
- type ProjectNode
- type ScanNode
- type SortNode
- type StreamingNode
- type WithColumnNode
Constants ¶
const ( // ScanSourceDataFrame is an in-memory DataFrame scan. The ScanNode.Handle // is the *dataframe.DataFrame to materialize. ScanSourceDataFrame = "dataframe" // ScanSourceFile is a file-backed scan that the executor opens as a // streaming RecordReader rather than materializing the whole file. The // ScanNode.Handle carries a file-scan descriptor the executor understands // (e.g. path + reader options). ScanSourceFile = "file" )
Variables ¶
This section is empty.
Functions ¶
func ExplainLogical ¶
func ExplainLogical(plan *LogicalPlan) string
Types ¶
type AggregateNode ¶
type AggregateNode struct {
Input LogicalNode
GroupKeys []string
Aggs []expr.AggNode
// contains filtered or unexported fields
}
AggregateNode is a grouped aggregation. It is named AggregateNode (not AggNode) to avoid colliding with expr.AggNode, the column-level aggregate it carries in Aggs.
func NewAggregateNode ¶
func NewAggregateNode(input LogicalNode, groupKeys []string, aggs []expr.AggNode) *AggregateNode
func (*AggregateNode) Children ¶
func (a *AggregateNode) Children() []LogicalNode
func (*AggregateNode) Name ¶
func (a *AggregateNode) Name() string
func (*AggregateNode) Schema ¶
func (a *AggregateNode) Schema() *schema.Schema
type Executor ¶
type Executor interface {
// Scan materializes the source handle into a DataFrame, optionally honoring
// scan-level pushdown annotations (columns, limit, filters). source is the
// ScanNode handle (e.g. a *dataframe.DataFrame).
Scan(ctx context.Context, node *ScanNode) (any, error)
Filter(ctx context.Context, df any, predicate expr.Expr) (any, error)
Project(df any, cols []string) (any, error)
Limit(df any, n int64) (any, error)
Sort(ctx context.Context, df any, keys []expr.SortKey) (any, error)
Aggregate(ctx context.Context, df any, groupKeys []string, aggs []expr.AggNode) (any, error)
Join(ctx context.Context, left, right any, on string, how string) (any, error)
WithColumn(df any, e expr.Expr) (any, error)
// ScanStream opens a file-backed ScanNode as a streaming RecordReader,
// honoring pushed column/limit annotations where the source supports them.
ScanStream(ctx context.Context, node *ScanNode) (any, error)
// FilterStream applies predicate to each batch of reader, yielding a reader
// over the surviving rows.
FilterStream(ctx context.Context, reader any, predicate expr.Expr) (any, error)
// ProjectStream narrows each batch of reader to cols.
ProjectStream(reader any, cols []string) (any, error)
// LimitStream stops the stream once n rows have been emitted across batches.
LimitStream(reader any, n int64) any
// StreamFromDataFrame adapts a materialized DataFrame handle into a
// RecordReader so a pipeline-breaking operator's output can be streamed.
StreamFromDataFrame(df any) (any, error)
}
Executor bridges the plan package to the dataframe package without an import cycle. The dataframe package already imports plan, so plan cannot import dataframe back. Instead, the dataframe package provides a concrete implementation of this interface and injects it into PhysicalPlan.Execute.
All DataFrame values cross this boundary as any: each method receives and returns the opaque handle the implementation understands (a *dataframe.DataFrame), keeping plan free of a dataframe import.
type FilterNode ¶
type FilterNode struct {
Input LogicalNode
Predicate expr.Expr
// contains filtered or unexported fields
}
func NewFilterNode ¶
func NewFilterNode(input LogicalNode, predicate expr.Expr) *FilterNode
func (*FilterNode) Children ¶
func (f *FilterNode) Children() []LogicalNode
func (*FilterNode) Name ¶
func (f *FilterNode) Name() string
func (*FilterNode) Schema ¶
func (f *FilterNode) Schema() *schema.Schema
type JoinNode ¶
type JoinNode struct {
Left LogicalNode
Right LogicalNode
On string
How string
// contains filtered or unexported fields
}
func NewJoinNode ¶
func NewJoinNode(left, right LogicalNode, on, how string) *JoinNode
func (*JoinNode) Children ¶
func (j *JoinNode) Children() []LogicalNode
type LimitNode ¶
type LimitNode struct {
Input LogicalNode
N int64
// contains filtered or unexported fields
}
func NewLimitNode ¶
func NewLimitNode(input LogicalNode, n int64) *LimitNode
func (*LimitNode) Children ¶
func (l *LimitNode) Children() []LogicalNode
type LogicalNode ¶
type LogicalNode interface {
Name() string
Schema() *schema.Schema
Children() []LogicalNode
}
type LogicalPlan ¶
type LogicalPlan struct {
Root LogicalNode
}
func Bind ¶
func Bind(plan *LogicalPlan) (*LogicalPlan, error)
func NewLogicalPlan ¶
func NewLogicalPlan(root LogicalNode) *LogicalPlan
func Optimize ¶
func Optimize(p *LogicalPlan) (*LogicalPlan, error)
Optimize runs the logical optimizer passes and returns a new logical plan. The passes annotate the ScanNode with pushdown hints (PushedFilters, PushedColumns, PushedLimit) that the physical executor may honor during scan.
The annotations are advisory: the operator nodes above the scan (Filter, Project, Limit) are preserved, so the plan stays correct whether or not the executor acts on a hint. Today the executor honors PushedLimit during scan; the filter and column hints are recorded for Explain and future scan sources that can prune at the source (e.g. Parquet column/row-group pruning).
type PhysAggregate ¶
type PhysAggregate struct {
Input PhysicalNode
GroupKeys []string
Aggs []expr.AggNode
// contains filtered or unexported fields
}
func (*PhysAggregate) Children ¶
func (n *PhysAggregate) Children() []PhysicalNode
func (*PhysAggregate) Name ¶
func (n *PhysAggregate) Name() string
func (*PhysAggregate) Schema ¶
func (n *PhysAggregate) Schema() *arrow.Schema
type PhysFilter ¶
type PhysFilter struct {
Input PhysicalNode
Predicate expr.Expr
// contains filtered or unexported fields
}
func (*PhysFilter) Children ¶
func (n *PhysFilter) Children() []PhysicalNode
func (*PhysFilter) ExecuteStream ¶
PhysFilter streams: filter each batch of the upstream reader.
func (*PhysFilter) Name ¶
func (n *PhysFilter) Name() string
func (*PhysFilter) Schema ¶
func (n *PhysFilter) Schema() *arrow.Schema
type PhysJoin ¶
type PhysJoin struct {
Left PhysicalNode
Right PhysicalNode
On string
How string
// contains filtered or unexported fields
}
func (*PhysJoin) Children ¶
func (n *PhysJoin) Children() []PhysicalNode
type PhysLimit ¶
type PhysLimit struct {
Input PhysicalNode
N int64
// contains filtered or unexported fields
}
func (*PhysLimit) Children ¶
func (n *PhysLimit) Children() []PhysicalNode
func (*PhysLimit) ExecuteStream ¶
PhysLimit streams: stop after N rows across batches.
type PhysProject ¶
type PhysProject struct {
Input PhysicalNode
Columns []string
// contains filtered or unexported fields
}
func (*PhysProject) Children ¶
func (n *PhysProject) Children() []PhysicalNode
func (*PhysProject) ExecuteStream ¶
PhysProject streams: narrow each batch of the upstream reader.
func (*PhysProject) Name ¶
func (n *PhysProject) Name() string
func (*PhysProject) Schema ¶
func (n *PhysProject) Schema() *arrow.Schema
type PhysScan ¶
type PhysScan struct {
Node *ScanNode
// contains filtered or unexported fields
}
PhysScan reads the source DataFrame, honoring any pushdown annotations.
func (*PhysScan) Children ¶
func (n *PhysScan) Children() []PhysicalNode
func (*PhysScan) ExecuteStream ¶
PhysScan streams: a file source opens a RecordReader directly; an in-memory DataFrame source materializes (cheaply, it is already in memory) and adapts.
type PhysSort ¶
type PhysSort struct {
Input PhysicalNode
Keys []expr.SortKey
// contains filtered or unexported fields
}
func (*PhysSort) Children ¶
func (n *PhysSort) Children() []PhysicalNode
type PhysWithColumn ¶
type PhysWithColumn struct {
Input PhysicalNode
Expr expr.Expr
// contains filtered or unexported fields
}
func (*PhysWithColumn) Children ¶
func (n *PhysWithColumn) Children() []PhysicalNode
func (*PhysWithColumn) Name ¶
func (n *PhysWithColumn) Name() string
func (*PhysWithColumn) Schema ¶
func (n *PhysWithColumn) Schema() *arrow.Schema
type PhysicalNode ¶
type PhysicalNode interface {
Name() string
Schema() *arrow.Schema
Children() []PhysicalNode
Execute(ctx context.Context, exec Executor) (any, error)
}
PhysicalNode is one executable operator. Execute runs the node against the injected Executor and returns the resulting DataFrame as an opaque handle.
type PhysicalPlan ¶
type PhysicalPlan struct {
Root PhysicalNode
}
func Lower ¶
func Lower(p *LogicalPlan) (*PhysicalPlan, error)
Lower translates an (optimized) logical plan into an executable physical plan.
func NewPhysicalPlan ¶
func NewPhysicalPlan(root PhysicalNode) *PhysicalPlan
func (*PhysicalPlan) Execute ¶
Execute runs the physical plan against exec and returns the resulting DataFrame handle (a *dataframe.DataFrame).
func (*PhysicalPlan) ExecuteStream ¶
ExecuteStream runs the physical plan in streaming mode, returning a Record reader handle (any wrapping scan.RecordReader) instead of a DataFrame. Nodes that implement StreamingNode stream natively; every other node materializes (via its Execute) and is adapted into a reader, which is exactly the pipeline-breaking contract.
func (*PhysicalPlan) Explain ¶
func (p *PhysicalPlan) Explain() string
Explain renders the physical plan tree with operator names and any scan-level pushdown annotations recorded by the optimizer.
type ProjectNode ¶
type ProjectNode struct {
Input LogicalNode
Columns []string
// contains filtered or unexported fields
}
func NewProjectNode ¶
func NewProjectNode(input LogicalNode, columns []string) *ProjectNode
func (*ProjectNode) Children ¶
func (p *ProjectNode) Children() []LogicalNode
func (*ProjectNode) Name ¶
func (p *ProjectNode) Name() string
func (*ProjectNode) Schema ¶
func (p *ProjectNode) Schema() *schema.Schema
type ScanNode ¶
type ScanNode struct {
// Handle is the opaque source object the executor scans (e.g. a
// *dataframe.DataFrame). It is typed as any to keep plan free of a
// dataframe import.
Handle any
// Pushdown annotations produced by Optimize. They are advisory: the
// executor honors whichever it understands and the surviving logical
// nodes above the scan still produce a correct result if it does not.
PushedFilters []expr.Expr
PushedColumns []string
PushedLimit int64 // <0 means "no limit pushed"
// contains filtered or unexported fields
}
func (*ScanNode) Children ¶
func (s *ScanNode) Children() []LogicalNode
type SortNode ¶
type SortNode struct {
Input LogicalNode
Keys []expr.SortKey
// contains filtered or unexported fields
}
func NewSortNode ¶
func NewSortNode(input LogicalNode, keys []expr.SortKey) *SortNode
func (*SortNode) Children ¶
func (s *SortNode) Children() []LogicalNode
type StreamingNode ¶
type StreamingNode interface {
PhysicalNode
// ExecuteStream runs the node and returns a streaming reader handle.
ExecuteStream(ctx context.Context, exec Executor) (any, error)
}
StreamingNode is a physical node that can produce its result as a streaming RecordReader handle (carried as any; see Executor) instead of a materialized DataFrame. Streamable operators (Scan over a file, Filter, Project, Limit) transform batches in flight. Pipeline-breaking operators (Sort, Aggregate, Join, WithColumn) cannot emit a correct batch until they have seen all input, so they materialize via Execute and then stream the result through the executor's StreamFromDataFrame — this is the fallback ExecuteStream below.
type WithColumnNode ¶
type WithColumnNode struct {
Input LogicalNode
Expr expr.Expr
// contains filtered or unexported fields
}
func NewWithColumnNode ¶
func NewWithColumnNode(input LogicalNode, e expr.Expr) *WithColumnNode
func (*WithColumnNode) Children ¶
func (w *WithColumnNode) Children() []LogicalNode
func (*WithColumnNode) Name ¶
func (w *WithColumnNode) Name() string
func (*WithColumnNode) Schema ¶
func (w *WithColumnNode) Schema() *schema.Schema