exec

package
v0.0.0-...-261b5b0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: May 12, 2016 License: MIT Imports: 22 Imported by: 0

Documentation

Overview

Exececution tasks and executor for DAG of plan tasks can be embedded and used, or extended using Executor interface.

Index

Constants

View Source
const (
	ItemDefaultChannelSize = 50
)
View Source
const (
	MaxAllowedPacket = 1024 * 1024
)
View Source
const (
	MysqlTimeFormat = "2006-01-02 15:04:05.000000000"
)

Variables

View Source
var (
	// Standard errors
	ErrShuttingDown     = fmt.Errorf("Received Shutdown Signal")
	ErrNotSupported     = fmt.Errorf("QLBridge: Not supported")
	ErrNotImplemented   = fmt.Errorf("QLBridge: Not implemented")
	ErrUnknownCommand   = fmt.Errorf("QLBridge: Unknown Command")
	ErrInternalError    = fmt.Errorf("QLBridge: Internal Error")
	ErrNoSchemaSelected = fmt.Errorf("No Schema Selected")
)

Functions

func DisableRecover

func DisableRecover()

func RegisterSqlDriver

func RegisterSqlDriver()

Types

type AggFunc

type AggFunc func(v value.Value)

type AggPartial

type AggPartial struct {
	Ct int64
	N  float64
}

type Aggregator

type Aggregator interface {
	Do(v value.Value)
	Result() interface{}
	Reset()
	Merge(*AggPartial)
}

func NewAvg

func NewAvg(col *rel.Column, partial bool) Aggregator

func NewCount

func NewCount(col *rel.Column) Aggregator

func NewGroupByValue

func NewGroupByValue(col *rel.Column) Aggregator

func NewSum

func NewSum(col *rel.Column, partial bool) Aggregator

type Command

type Command struct {
	*TaskBase
	// contains filtered or unexported fields
}

Command is executeable task for SET SQL commands

func NewCommand

func NewCommand(ctx *plan.Context, p *plan.Command) *Command

NewCommand creates new command exec task

func (*Command) Close

func (m *Command) Close() error

Close Command

func (*Command) Run

func (m *Command) Run() error

Run Command

type DeletionScanner

type DeletionScanner struct {
	*DeletionTask
}

Delete scanner if we don't have a seek operation on this source

func (*DeletionScanner) Run

func (m *DeletionScanner) Run() error

type DeletionTask

type DeletionTask struct {
	*TaskBase
	// contains filtered or unexported fields
}

Delete task for sources that natively support delete

func NewDelete

func NewDelete(ctx *plan.Context, p *plan.Delete) *DeletionTask

An inserter to write to data source

func (*DeletionTask) Close

func (m *DeletionTask) Close() error

func (*DeletionTask) Run

func (m *DeletionTask) Run() error

type ErrChan

type ErrChan chan error

type Executor

type Executor interface {
	NewTask(p plan.Task) Task
	WalkPlan(p plan.Task) (Task, error)
	WalkSelect(p *plan.Select) (Task, error)
	WalkInsert(p *plan.Insert) (Task, error)
	WalkUpsert(p *plan.Upsert) (Task, error)
	WalkUpdate(p *plan.Update) (Task, error)
	WalkDelete(p *plan.Delete) (Task, error)
	WalkCommand(p *plan.Command) (Task, error)
	WalkPreparedStatement(p *plan.PreparedStatement) (Task, error)

	// Child Tasks
	WalkSource(p *plan.Source) (Task, error)
	WalkJoin(p *plan.JoinMerge) (Task, error)
	WalkJoinKey(p *plan.JoinKey) (Task, error)
	WalkWhere(p *plan.Where) (Task, error)
	WalkHaving(p *plan.Having) (Task, error)
	WalkGroupBy(p *plan.GroupBy) (Task, error)
	WalkProjection(p *plan.Projection) (Task, error)
}

Executor defines standard Walk() pattern to create a executeable task dag from a plan dag

An implementation of WalkPlan() will be be able to execute/run a Statement

  • inproc: ie, in process. qlbridge/exec package implements a non-distributed query-planner
  • distributed: ie, run this job across multiple servers dataux/planner implements a distributed query-planner that distributes/runs tasks across multiple nodes

type ExecutorSource

type ExecutorSource interface {
	// given our plan, turn that into a Task.
	WalkExecSource(p *plan.Source) (Task, error)
}

Sources can often do their own execution-plan for sub-select statements

ie mysql can do its own (select, projection) mongo, es can as well

- provide interface to allow passing down select planning to source

type GroupBy

type GroupBy struct {
	*TaskBase
	// contains filtered or unexported fields
}

Group by: Sql Group By Operator

creates a hashable key commposed of key = {each,value,of,column,in,groupby}

A very stupid naive parallel groupby holds values in memory

task   ->  groupby  -->

func NewGroupBy

func NewGroupBy(ctx *plan.Context, p *plan.GroupBy) *GroupBy

func (*GroupBy) Close

func (m *GroupBy) Close() error

func (*GroupBy) Run

func (m *GroupBy) Run() error

type GroupByFinal

type GroupByFinal struct {
	*TaskBase
	// contains filtered or unexported fields
}

Group by: Sql Group By Operator finalizer for partials

func NewGroupByFinal

func NewGroupByFinal(ctx *plan.Context, p *plan.GroupBy) *GroupByFinal

func (*GroupByFinal) Close

func (m *GroupByFinal) Close() error

func (*GroupByFinal) Run

func (m *GroupByFinal) Run() error

type JobExecutor

type JobExecutor struct {
	Planner  plan.Planner
	Executor Executor
	RootTask TaskRunner
	Ctx      *plan.Context
	// contains filtered or unexported fields
}

JobExecutor translates a Sql Statement into a Execution DAG of tasks using the Planner, Executor supplied. This package implements default executor and uses the default Planner from plan. This will create a single node dag of Tasks.

func BuildSqlJob

func BuildSqlJob(ctx *plan.Context) (*JobExecutor, error)

func NewExecutor

func NewExecutor(ctx *plan.Context, planner plan.Planner) *JobExecutor

func (*JobExecutor) Close

func (m *JobExecutor) Close() error

Close the normal close of root task

func (*JobExecutor) DrainChan

func (m *JobExecutor) DrainChan() MessageChan

The drain is the last out channel, on last task

func (*JobExecutor) NewTask

func (m *JobExecutor) NewTask(p plan.Task) Task

func (*JobExecutor) Run

func (m *JobExecutor) Run() error

Run this task

func (*JobExecutor) Setup

func (m *JobExecutor) Setup() error

Setup this dag of tasks

func (*JobExecutor) WalkChildren

func (m *JobExecutor) WalkChildren(p plan.Task, root Task) error

WalkChildren walk dag of plan taasks creating execution tasks

func (*JobExecutor) WalkCommand

func (m *JobExecutor) WalkCommand(p *plan.Command) (Task, error)

func (*JobExecutor) WalkDelete

func (m *JobExecutor) WalkDelete(p *plan.Delete) (Task, error)

func (*JobExecutor) WalkGroupBy

func (m *JobExecutor) WalkGroupBy(p *plan.GroupBy) (Task, error)

func (*JobExecutor) WalkHaving

func (m *JobExecutor) WalkHaving(p *plan.Having) (Task, error)

func (*JobExecutor) WalkInsert

func (m *JobExecutor) WalkInsert(p *plan.Insert) (Task, error)

func (*JobExecutor) WalkJoin

func (m *JobExecutor) WalkJoin(p *plan.JoinMerge) (Task, error)

func (*JobExecutor) WalkJoinKey

func (m *JobExecutor) WalkJoinKey(p *plan.JoinKey) (Task, error)

func (*JobExecutor) WalkPlan

func (m *JobExecutor) WalkPlan(p plan.Task) (Task, error)

Main Entry point to take a Plan, and convert into Execution DAG

func (*JobExecutor) WalkPlanAll

func (m *JobExecutor) WalkPlanAll(p plan.Task) (Task, error)

func (*JobExecutor) WalkPlanTask

func (m *JobExecutor) WalkPlanTask(p plan.Task) (Task, error)

func (*JobExecutor) WalkPreparedStatement

func (m *JobExecutor) WalkPreparedStatement(p *plan.PreparedStatement) (Task, error)

func (*JobExecutor) WalkProjection

func (m *JobExecutor) WalkProjection(p *plan.Projection) (Task, error)

func (*JobExecutor) WalkSelect

func (m *JobExecutor) WalkSelect(p *plan.Select) (Task, error)

func (*JobExecutor) WalkSource

func (m *JobExecutor) WalkSource(p *plan.Source) (Task, error)

func (*JobExecutor) WalkSourceExec

func (m *JobExecutor) WalkSourceExec(p *plan.Source) (Task, error)

func (*JobExecutor) WalkUpdate

func (m *JobExecutor) WalkUpdate(p *plan.Update) (Task, error)

func (*JobExecutor) WalkUpsert

func (m *JobExecutor) WalkUpsert(p *plan.Upsert) (Task, error)

func (*JobExecutor) WalkWhere

func (m *JobExecutor) WalkWhere(p *plan.Where) (Task, error)

type JobMaker

type JobMaker func(ctx *plan.Context) (Executor, error)

Job Factory

type JobRunner

type JobRunner interface {
	Setup() error
	Run() error
	Close() error
}

Job Runner is the main RunTime interface for running a SQL Job of tasks

type JoinKey

type JoinKey struct {
	*TaskBase
	// contains filtered or unexported fields
}

Evaluate messages to create JoinKey based message, where the

Join Key (composite of each value in join expr) hashes consistently

func NewJoinKey

func NewJoinKey(ctx *plan.Context, p *plan.JoinKey) *JoinKey

A JoinKey task that evaluates the compound JoinKey to allow

for parallelized join's

 source1   ->  JoinKey  ->  hash-route
                                       \
                                        --  join  -->
                                       /
 source2   ->  JoinKey  ->  hash-route

func (*JoinKey) Close

func (m *JoinKey) Close() error

func (*JoinKey) Run

func (m *JoinKey) Run() error

type JoinMerge

type JoinMerge struct {
	*TaskBase
	// contains filtered or unexported fields
}

Scans 2 source tasks for rows, evaluate keys, use for join

func NewJoinNaiveMerge

func NewJoinNaiveMerge(ctx *plan.Context, l, r TaskRunner, p *plan.JoinMerge) *JoinMerge

A very stupid naive parallel join merge, uses Key() as value to merge

two different input channels

source1   ->
             \
               --  join  -->
             /
source2   ->

Distributed:

source1a  ->                |-> --  join  -->
source1b  -> key-hash-route |-> --  join  -->  reduce ->
source1n  ->                |-> --  join  -->
                            |-> --  join  -->
source2a  ->                |-> --  join  -->
source2b  -> key-hash-route |-> --  join  -->
source2n  ->                |-> --  join  -->

func (*JoinMerge) Close

func (m *JoinMerge) Close() error

func (*JoinMerge) Run

func (m *JoinMerge) Run() error

type KeyEvaluator

type KeyEvaluator func(msg schema.Message) driver.Value

type MessageChan

type MessageChan chan schema.Message

type MessageHandler

type MessageHandler func(ctx *plan.Context, msg schema.Message) bool

Handle/Forward a message for this Task

func MakeHandler

func MakeHandler(task TaskRunner) MessageHandler

type Projection

type Projection struct {
	*TaskBase
	// contains filtered or unexported fields
}

Projection Execution Task

func NewProjection

func NewProjection(ctx *plan.Context, p *plan.Projection) *Projection

In Process projections are used when mapping multiple sources together

and additional columns such as those used in Where, GroupBy etc are used
even if they will not be used in Final projection

func NewProjectionFinal

func NewProjectionFinal(ctx *plan.Context, p *plan.Projection) *Projection

Final Projections project final select columns for result-writing

func NewProjectionInProcess

func NewProjectionInProcess(ctx *plan.Context, p *plan.Projection) *Projection

In Process projections are used when mapping multiple sources together

and additional columns such as those used in Where, GroupBy etc are used
even if they will not be used in Final projection

func NewProjectionLimit

func NewProjectionLimit(ctx *plan.Context, p *plan.Projection) *Projection

NewProjectionLimit Only provides counting/limit projection

func (*Projection) Close

func (m *Projection) Close() error

Close cleans up and closes channels

func (*Projection) CloseFinal

func (m *Projection) CloseFinal() error

CloseFinal after exit, cleanup some more

type RequiresContext

type RequiresContext interface {
	SetContext(ctx *plan.Context)
}

Source data sources requires context

type ResultBuffer

type ResultBuffer struct {
	*TaskBase
	// contains filtered or unexported fields
}

func NewResultBuffer

func NewResultBuffer(ctx *plan.Context, writeTo *[]schema.Message) *ResultBuffer

func (*ResultBuffer) Close

func (m *ResultBuffer) Close() error

func (*ResultBuffer) Copy

func (m *ResultBuffer) Copy() *ResultBuffer

type ResultExecWriter

type ResultExecWriter struct {
	*TaskBase
	// contains filtered or unexported fields
}

func NewResultExecWriter

func NewResultExecWriter(ctx *plan.Context) *ResultExecWriter

func (*ResultExecWriter) Close

func (m *ResultExecWriter) Close() error

func (*ResultExecWriter) Copy

func (*ResultExecWriter) Result

func (m *ResultExecWriter) Result() driver.Result

type ResultWriter

type ResultWriter struct {
	*TaskBase
	// contains filtered or unexported fields
}

func NewResultRows

func NewResultRows(ctx *plan.Context, cols []string) *ResultWriter

func NewResultWriter

func NewResultWriter(ctx *plan.Context) *ResultWriter

func (*ResultWriter) Close

func (m *ResultWriter) Close() error

func (*ResultWriter) Columns

func (m *ResultWriter) Columns() []string

func (*ResultWriter) Copy

func (m *ResultWriter) Copy() *ResultWriter

func (*ResultWriter) Next

func (m *ResultWriter) Next(dest []driver.Value) error

Note, this is implementation of the sql/driver Rows() Next() interface

func (*ResultWriter) Run

func (m *ResultWriter) Run() error

For ResultWriter, since we are are not paging through messages

using this mesage channel, instead using Next() as defined by sql/driver
we don't read the input channel, just watch stop channels

type SigChan

type SigChan chan bool

Task channel types

type Source

type Source struct {
	*TaskBase

	Scanner    schema.ConnScanner
	ExecSource ExecutorSource
	JoinKey    KeyEvaluator
	// contains filtered or unexported fields
}

Scan a data source for rows, feed into runner. The source scanner being

 a source is iter.Next() messages instead of sending them on input channel

1) table      -- FROM table
2) channels   -- FROM stream
3) join       -- SELECT t1.name, t2.salary
                     FROM employee AS t1
                     INNER JOIN info AS t2
                     ON t1.name = t2.name;
4) sub-select -- SELECT * FROM (SELECT 1, 2, 3) AS t1;

func NewSource

func NewSource(ctx *plan.Context, p *plan.Source) (*Source, error)

A scanner to read from data source

func NewSourceScanner

func NewSourceScanner(ctx *plan.Context, p *plan.Source, scanner schema.ConnScanner) *Source

A scanner to read from sub-query data source (join, sub-query, static)

func (*Source) Close

func (m *Source) Close() error

func (*Source) Copy

func (m *Source) Copy() *Source

func (*Source) Run

func (m *Source) Run() error

type Task

type Task interface {
	Run() error
	Close() error
	CloseFinal() error
	Children() []Task // children sub-tasks
	Add(Task) error   // Add a child to this dag
}

exec Tasks are inherently DAG's of task's implementing Run(), Close() etc

to allow them to be executeable

func BuildSqlJobPlanned

func BuildSqlJobPlanned(planner plan.Planner, executor Executor, ctx *plan.Context) (Task, error)

Create Job made up of sub-tasks in DAG that is the

plan for execution of this query/job

type TaskBase

type TaskBase struct {
	Ctx     *plan.Context
	Name    string
	Handler MessageHandler
	// contains filtered or unexported fields
}

Base executeable task that implements Task interface, embedded into other channel based task runners

func NewTaskBase

func NewTaskBase(ctx *plan.Context) *TaskBase

func (*TaskBase) Add

func (m *TaskBase) Add(task Task) error

func (*TaskBase) AddPlan

func (m *TaskBase) AddPlan(task plan.Task) error

func (*TaskBase) Children

func (m *TaskBase) Children() []Task

func (*TaskBase) Close

func (m *TaskBase) Close() error

func (*TaskBase) CloseFinal

func (m *TaskBase) CloseFinal() error

func (*TaskBase) ErrChan

func (m *TaskBase) ErrChan() ErrChan

func (*TaskBase) MessageIn

func (m *TaskBase) MessageIn() MessageChan

func (*TaskBase) MessageInSet

func (m *TaskBase) MessageInSet(ch MessageChan)

func (*TaskBase) MessageOut

func (m *TaskBase) MessageOut() MessageChan

func (*TaskBase) MessageOutSet

func (m *TaskBase) MessageOutSet(ch MessageChan)

func (*TaskBase) Quit

func (m *TaskBase) Quit()

func (*TaskBase) Run

func (m *TaskBase) Run() error

func (*TaskBase) Setup

func (m *TaskBase) Setup(depth int) error

func (*TaskBase) SigChan

func (m *TaskBase) SigChan() SigChan

type TaskParallel

type TaskParallel struct {
	*TaskBase
	// contains filtered or unexported fields
}

A parallel set of tasks, this starts each child task and offers up

 an output channel that is a merger of each child

--> \
--> - ->
--> /

func NewTaskParallel

func NewTaskParallel(ctx *plan.Context) *TaskParallel

func (*TaskParallel) Add

func (m *TaskParallel) Add(task Task) error

func (*TaskParallel) Children

func (m *TaskParallel) Children() []Task

func (*TaskParallel) Close

func (m *TaskParallel) Close() error

func (*TaskParallel) PrintDag

func (m *TaskParallel) PrintDag(depth int)

func (*TaskParallel) Run

func (m *TaskParallel) Run() error

func (*TaskParallel) Setup

func (m *TaskParallel) Setup(depth int) error

type TaskPrinter

type TaskPrinter interface {
	PrintDag(depth int)
}

type TaskRunner

type TaskRunner interface {
	Task
	Setup(depth int) error
	MessageIn() MessageChan
	MessageOut() MessageChan
	MessageInSet(MessageChan)
	MessageOutSet(MessageChan)
	ErrChan() ErrChan
	SigChan() SigChan
	Quit()
}

TaskRunner is an interface for a single task in Dag of Tasks necessary to execute a Job - it may have children tasks - it may be parallel, distributed, etc

type TaskSequential

type TaskSequential struct {
	*TaskBase
	// contains filtered or unexported fields
}

func NewTaskSequential

func NewTaskSequential(ctx *plan.Context) *TaskSequential

func (*TaskSequential) Add

func (m *TaskSequential) Add(task Task) error

func (*TaskSequential) Children

func (m *TaskSequential) Children() []Task

func (*TaskSequential) Close

func (m *TaskSequential) Close() error

func (*TaskSequential) PrintDag

func (m *TaskSequential) PrintDag(depth int)

func (*TaskSequential) Run

func (m *TaskSequential) Run() (err error)

func (*TaskSequential) Setup

func (m *TaskSequential) Setup(depth int) error

type TaskStepper

type TaskStepper struct {
	*TaskBase
}

On Task stepper we don't Run it, rather use a

Next() explicit call from end user

func NewTaskStepper

func NewTaskStepper(ctx *plan.Context) *TaskStepper

func (*TaskStepper) Run

func (m *TaskStepper) Run() error

type Upsert

type Upsert struct {
	*TaskBase
	// contains filtered or unexported fields
}

Upsert task for insert, update, upsert

func NewInsert

func NewInsert(ctx *plan.Context, p *plan.Insert) *Upsert

An insert to write to data source

func NewUpdate

func NewUpdate(ctx *plan.Context, p *plan.Update) *Upsert

func NewUpsert

func NewUpsert(ctx *plan.Context, p *plan.Upsert) *Upsert

func (*Upsert) Close

func (m *Upsert) Close() error

func (*Upsert) Run

func (m *Upsert) Run() error

type Where

type Where struct {
	*TaskBase
	// contains filtered or unexported fields
}

A filter to implement where clause

func NewHaving

func NewHaving(ctx *plan.Context, p *plan.Having) *Where

Having-Filter

func NewWhere

func NewWhere(ctx *plan.Context, p *plan.Where) *Where

Where-Filter

filters vs final differ bc the Final does final column aliasing

func NewWhereFilter

func NewWhereFilter(ctx *plan.Context, sql *rel.SqlSelect) *Where

Where-Filter

filters vs final differ bc the Final does final column aliasing

func NewWhereFinal

func NewWhereFinal(ctx *plan.Context, p *plan.Where) *Where

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL