Version: v1.0.3 Latest Latest

This package is not in the latest version of its module.

Go to latest
Published: Jul 6, 2017 License: Apache-2.0 Imports: 16 Imported by: 0




This section is empty.


View Source
var DistAggregationTable = map[distsqlrun.AggregatorSpec_Func]DistAggregationInfo{
	distsqlrun.AggregatorSpec_IDENT: {
		LocalStage: []distsqlrun.AggregatorSpec_Func{distsqlrun.AggregatorSpec_IDENT},
		FinalStage: []distsqlrun.AggregatorSpec_Func{distsqlrun.AggregatorSpec_IDENT},

	distsqlrun.AggregatorSpec_BOOL_AND: {
		LocalStage: []distsqlrun.AggregatorSpec_Func{distsqlrun.AggregatorSpec_BOOL_AND},
		FinalStage: []distsqlrun.AggregatorSpec_Func{distsqlrun.AggregatorSpec_BOOL_AND},

	distsqlrun.AggregatorSpec_BOOL_OR: {
		LocalStage: []distsqlrun.AggregatorSpec_Func{distsqlrun.AggregatorSpec_BOOL_OR},
		FinalStage: []distsqlrun.AggregatorSpec_Func{distsqlrun.AggregatorSpec_BOOL_OR},

	distsqlrun.AggregatorSpec_COUNT: {
		LocalStage: []distsqlrun.AggregatorSpec_Func{distsqlrun.AggregatorSpec_COUNT},
		FinalStage: []distsqlrun.AggregatorSpec_Func{distsqlrun.AggregatorSpec_SUM_INT},

	distsqlrun.AggregatorSpec_MAX: {
		LocalStage: []distsqlrun.AggregatorSpec_Func{distsqlrun.AggregatorSpec_MAX},
		FinalStage: []distsqlrun.AggregatorSpec_Func{distsqlrun.AggregatorSpec_MAX},

	distsqlrun.AggregatorSpec_MIN: {
		LocalStage: []distsqlrun.AggregatorSpec_Func{distsqlrun.AggregatorSpec_MIN},
		FinalStage: []distsqlrun.AggregatorSpec_Func{distsqlrun.AggregatorSpec_MIN},

	distsqlrun.AggregatorSpec_SUM: {
		LocalStage: []distsqlrun.AggregatorSpec_Func{distsqlrun.AggregatorSpec_SUM},
		FinalStage: []distsqlrun.AggregatorSpec_Func{distsqlrun.AggregatorSpec_SUM},

	distsqlrun.AggregatorSpec_AVG: {
		LocalStage: []distsqlrun.AggregatorSpec_Func{
		FinalStage: []distsqlrun.AggregatorSpec_Func{
		FinalRendering: func(h *parser.IndexedVarHelper, varIdxOffset int) (parser.TypedExpr, error) {
			sum := h.IndexedVar(varIdxOffset)
			count := h.IndexedVar(varIdxOffset + 1)

			expr := &parser.BinaryExpr{
				Operator: parser.Div,
				Left:     sum,
				Right:    count,

			if sum.ResolvedType().Equivalent(parser.TypeFloat) {
				expr.Right = &parser.CastExpr{
					Expr: count,
					Type: parser.NewFloatColType(0, false),
			return expr.TypeCheck(nil, parser.TypeAny)

DistAggregationTable is DistAggregationInfo look-up table. Functions that don't have an entry in the table are not optimized with a local stage.


func MakeExpression

func MakeExpression(expr parser.TypedExpr, indexVarMap []int) distsqlrun.Expression

MakeExpression creates a distsqlrun.Expression.

The distsqlrun.Expression uses the placeholder syntax (@1, @2, @3..) to refer to columns.

The expr uses IndexedVars to refer to columns. The caller can optionally remap these columns by passing an indexVarMap: an IndexedVar with index i becomes column indexVarMap[i].

func MakeTypeIndexedVarHelper

func MakeTypeIndexedVarHelper(types []sqlbase.ColumnType) parser.IndexedVarHelper

MakeTypeIndexedVarHelper returns an IndexedVarHelper which creates IndexedVars with the given types.

func MergePlans

func MergePlans(
	left, right *PhysicalPlan,
) (mergedPlan PhysicalPlan, leftRouters []ProcessorIdx, rightRouters []ProcessorIdx)

MergePlans merges the processors and streams of two plan into a new plan. The result routers for each side are also returned (they point at processors in the merged plan).


type DistAggregationInfo

type DistAggregationInfo struct {
	// The local stage consists of one or more aggregations. All aggregations have
	// the same input.
	LocalStage []distsqlrun.AggregatorSpec_Func

	// The final stage consists of the same number of aggregations as the local
	// stage (the input of each one is the corresponding result from each instance
	// of the local stage).
	FinalStage []distsqlrun.AggregatorSpec_Func

	// An optional rendering expression used to obtain the final result; required
	// if there is more than one aggregation in each of the stages.
	// Conceptually this is an expression that has access to the final stage
	// results (via IndexedVars), to be run as the PostProcessing step of the
	// final stage processor.  However, there are some complications:
	//   - this structure is a blueprint for aggregating inputs of different
	//     types, and in some cases the expression may be different depending on
	//     the types (see AVG below).
	//   - we support combining multiple "top level" aggregations into the same
	//     processors, so the correct indexing of the input variables is not
	//     predetermined.
	// Instead of defining a canonical non-typed expression and then tweaking it
	// with visitors, we use a function that directly creates a typed expression
	// on demand. The expression will refer to the final stage results using
	// IndexedVars, with indices shifted by varIdxOffset.
	FinalRendering func(h *parser.IndexedVarHelper, varIdxOffset int) (parser.TypedExpr, error)

DistAggregationInfo is a blueprint for planning distributed aggregations. It describes two stages - a local stage performs local aggregations wherever data is available and generates partial results, and a final stage aggregates the partial results from all data "partitions".

The simplest example is SUM: the local stage computes the SUM of the items on each node, and a final stage SUMs those partial sums into a final sum. Similar functions are MIN, MAX, BOOL_AND, BOOL_OR.

A less trivial example is COUNT: the local stage counts (COUNT), the final stage adds the counts (SUM_INT).

A more complex example is AVG, for which we have to do *multiple* aggregations in each stage: we need to get a sum and a count, so the local stage does SUM and COUNT, and the final stage does SUM and SUM_INT. We also need an expression that takes these two values and generates the final AVG result.

type LeaseHolderChoosingPolicy

type LeaseHolderChoosingPolicy byte

LeaseHolderChoosingPolicy enumerates the implementors of leaseHolderOracle.

const (
	// RandomLeaseHolderChoice chooses lease holders randomly.
	RandomLeaseHolderChoice LeaseHolderChoosingPolicy = iota
	// BinPackingLeaseHolderChoice bin-packs the choices.

type PhysicalPlan

type PhysicalPlan struct {
	// Processors in the plan.
	Processors []Processor

	// Streams accumulates the streams in the plan - both local (intra-node) and
	// remote (inter-node); when we have a final plan, the streams are used to
	// generate processor input and output specs (see PopulateEndpoints).
	Streams []Stream

	// ResultRouters identifies the output routers which output the results of the
	// plan. These are the routers to which we have to connect new streams in
	// order to extend the plan.
	// The processors which have this routers are all part of the same "stage":
	// they have the same "schema" and PostProcessSpec.
	// We assume all processors have a single output so we only need the processor
	// index.
	ResultRouters []ProcessorIdx

	// ResultTypes is the schema (column types) of the rows produced by the
	// ResultRouters.
	// This is aliased with InputSyncSpec.ColumnTypes, so it must not be modified
	// in-place during planning.
	ResultTypes []sqlbase.ColumnType

	// MergeOrdering is the ordering guarantee for the result streams that must be
	// maintained when the streams eventually merge. The column indexes refer to
	// columns for the rows produced by ResultRouters.
	// Empty when there is a single result router. The reason is that maintaining
	// an ordering sometimes requires to add columns to streams for the sole
	// reason of correctly merging the streams later (see AddProjection); we don't
	// want to pay this cost if we don't have multiple streams to merge.
	MergeOrdering distsqlrun.Ordering

PhysicalPlan represents a network of processors and streams along with information about the results output by this network. The results come from unconnected output routers of a subset of processors; all these routers output the same kind of data (same schema).

func (*PhysicalPlan) AddFilter

func (p *PhysicalPlan) AddFilter(expr parser.TypedExpr, indexVarMap []int)

AddFilter adds a filter on the output of a plan. The filter is added either as a post-processing step to the last stage or to a new "no-op" stage, as necessary.

See MakeExpression for a description of indexVarMap.

func (*PhysicalPlan) AddLimit

func (p *PhysicalPlan) AddLimit(count int64, offset int64, node roachpb.NodeID) error

AddLimit adds a limit and/or offset to the results of the current plan. If there are multiple result streams, they are joined into a single processor that is placed on the given node.

For no limit, count should be MaxInt64.

func (*PhysicalPlan) AddNoGroupingStage

func (p *PhysicalPlan) AddNoGroupingStage(
	core distsqlrun.ProcessorCoreUnion,
	post distsqlrun.PostProcessSpec,
	outputTypes []sqlbase.ColumnType,
	newOrdering distsqlrun.Ordering,

AddNoGroupingStage adds a processor for each result router, on the same node with the source of the stream; all processors have the same core. This is for stages that correspond to logical blocks that don't require any grouping (e.g. evaluator, sorting, etc).

func (*PhysicalPlan) AddProcessor

func (p *PhysicalPlan) AddProcessor(proc Processor) ProcessorIdx

AddProcessor adds a processor to a PhysicalPlan and returns the index that can be used to refer to that processor.

func (*PhysicalPlan) AddProjection

func (p *PhysicalPlan) AddProjection(columns []uint32)

AddProjection applies a projection to a plan. The new plan outputs the columns of the old plan as listed in the slice. The Ordering is updated; columns in the ordering are added to the projection as needed.

Note: the columns slice is relinquished to this function, which can modify it or use it directly in specs.

func (*PhysicalPlan) AddRendering

func (p *PhysicalPlan) AddRendering(
	exprs []parser.TypedExpr, indexVarMap []int, outTypes []sqlbase.ColumnType,

AddRendering adds a rendering (expression evaluation) to the output of a plan. The rendering is achieved either through an adjustment on the last stage post-process spec, or via a new stage.

The Ordering is updated; columns in the ordering are added to the render expressions as necessary.

See MakeExpression for a description of indexVarMap.

func (*PhysicalPlan) AddSingleGroupStage

func (p *PhysicalPlan) AddSingleGroupStage(
	nodeID roachpb.NodeID,
	core distsqlrun.ProcessorCoreUnion,
	post distsqlrun.PostProcessSpec,
	outputTypes []sqlbase.ColumnType,

AddSingleGroupStage adds a "single group" stage (one that cannot be parallelized) which consists of a single processor on the specified node. The previous stage (ResultRouters) are all connected to this processor.

func (*PhysicalPlan) GenerateFlowSpecs

func (p *PhysicalPlan) GenerateFlowSpecs() map[roachpb.NodeID]distsqlrun.FlowSpec

GenerateFlowSpecs takes a plan (with populated endpoints) and generates the set of FlowSpecs (one per node involved in the plan).

func (*PhysicalPlan) GetLastStagePost

func (p *PhysicalPlan) GetLastStagePost() distsqlrun.PostProcessSpec

GetLastStagePost returns the PostProcessSpec for the processors in the last stage (ResultRouters).

func (*PhysicalPlan) MergeResultStreams

func (p *PhysicalPlan) MergeResultStreams(
	resultRouters []ProcessorIdx,
	sourceRouterSlot int,
	ordering distsqlrun.Ordering,
	destProcessor ProcessorIdx,
	destInput int,

MergeResultStreams connects a set of resultRouters to a synchronizer. The synchronizer is configured with the provided ordering.

func (*PhysicalPlan) PopulateEndpoints

func (p *PhysicalPlan) PopulateEndpoints(nodeAddresses map[roachpb.NodeID]string)

PopulateEndpoints processes p.Streams and adds the corresponding StreamEndpointSpecs to the processors' input and output specs. This should be used when the plan is completed and ready to be executed.

The nodeAddresses map contains the address of all the nodes referenced in the plan.

func (*PhysicalPlan) SetLastStagePost

func (p *PhysicalPlan) SetLastStagePost(
	post distsqlrun.PostProcessSpec, outputTypes []sqlbase.ColumnType,

SetLastStagePost changes the PostProcess spec of the processors in the last stage (ResultRouters). The caller must update the ordering via SetOrdering.

func (*PhysicalPlan) SetMergeOrdering

func (p *PhysicalPlan) SetMergeOrdering(o distsqlrun.Ordering)

SetMergeOrdering sets p.MergeOrdering.

type Processor

type Processor struct {
	// Node where the processor must be instantiated.
	Node roachpb.NodeID

	// Spec for the processor; note that the StreamEndpointSpecs in the input
	// synchronizers and output routers are not set until the end of the planning
	// process.
	Spec distsqlrun.ProcessorSpec

Processor contains the information associated with a processor in a plan.

type ProcessorIdx

type ProcessorIdx int

ProcessorIdx identifies a processor by its index in PhysicalPlan.Processors.

type SpanResolver

type SpanResolver interface {
	// NewSpanResolverIterator creates a new SpanResolverIterator.
	// The txn is only used by the "fake" implementation (used for testing).
	NewSpanResolverIterator(txn *client.Txn) SpanResolverIterator

SpanResolver resolves key spans to their respective ranges and lease holders. Used for planning physical execution of distributed SQL queries.

Sample usage for resolving a bunch of spans:

func resolveSpans(

ctx context.Context,
it *distsql.SpanResolverIterator,
spans ...spanWithDir,

) ([][]kv.ReplicaInfo, error) {

lr := distsql.NewSpanResolver(
  distSender, gossip, nodeDescriptor,
it := lr.NewSpanResolverIterator(nil)
res := make([][]kv.ReplicaInfo, 0)
for _, span := range spans {
  repls := make([]kv.ReplicaInfo, 0)
  for it.Seek(ctx, span.Span, span.dir); ; it.Next(ctx) {
    if !it.Valid() {
      return nil, it.Error()
    repl, err := it.ReplicaInfo(ctx)
    if err != nil {
      return nil, err
    repls = append(repls, repl)
    if !it.NeedAnother() {
  res = append(res, repls)
return res, nil


func NewFakeSpanResolver

func NewFakeSpanResolver(nodes []*roachpb.NodeDescriptor) SpanResolver

NewFakeSpanResolver creates a fake span resolver.

func NewSpanResolver

func NewSpanResolver(
	distSender *kv.DistSender,
	gossip *gossip.Gossip,
	nodeDesc roachpb.NodeDescriptor,
	choosingPolicy LeaseHolderChoosingPolicy,
) SpanResolver

NewSpanResolver creates a new spanResolver.

type SpanResolverIterator

type SpanResolverIterator interface {
	// Seek positions the iterator on the start of a span (span.Key or
	// span.EndKey, depending on ScanDir). Note that span.EndKey is exclusive,
	// regardless of scanDir.
	// After calling this, ReplicaInfo() will return information about the range
	// containing the start key of the span (or the end key, if the direction is
	// Descending).
	// NeedAnother() will return true until the iterator is positioned on or after
	// the end of the span.  Possible errors encountered should be checked for
	// with Valid().
	// Seek can be called repeatedly on the same iterator. To make optimal uses of
	// caches, Seek()s should be performed on spans sorted according to the
	// scanDir (if Descending, then the span with the highest keys should be
	// Seek()ed first).
	// scanDir changes the direction in which Next() will advance the iterator.
	Seek(ctx context.Context, span roachpb.Span, scanDir kv.ScanDirection)

	// NeedAnother returns true if the current range is not the last for the span
	// that was last Seek()ed.
	NeedAnother() bool

	// Next advances the iterator to the next range. The next range contains the
	// last range's end key (but it does not necessarily start there, because of
	// asynchronous range splits and caching effects).
	// Possible errors encountered should be checked for with Valid().
	Next(ctx context.Context)

	// Valid returns false if an error was encountered by the last Seek() or Next().
	Valid() bool

	// Error returns any error encountered by the last Seek() or Next().
	Error() error

	// Desc returns the current RangeDescriptor.
	Desc() roachpb.RangeDescriptor

	// ReplicaInfo returns information about the replica that has been picked for
	// the current range.
	// A RangeUnavailableError is returned if there's no information in gossip
	// about any of the replicas.
	ReplicaInfo(ctx context.Context) (kv.ReplicaInfo, error)

SpanResolverIterator is used to iterate over the ranges composing a key span.

type Stream

type Stream struct {
	// SourceProcessor index (within the same plan).
	SourceProcessor ProcessorIdx

	// SourceRouterSlot identifies the position of this stream among the streams
	// that originate from the same router. This is important when routing by hash
	// where the order of the streams in the OutputRouterSpec matters.
	SourceRouterSlot int

	// DestProcessor index (within the same plan).
	DestProcessor ProcessorIdx

	// DestInput identifies the input of DestProcessor (some processors have
	// multiple inputs).
	DestInput int

Stream connects the output router of one processor to an input synchronizer of another processor.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL