exec

package
v0.0.0-...-daaaa79 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jun 26, 2015 License: MIT Imports: 16 Imported by: 0

README

Runtime

Execution consists of a DAG of Tasks called a Job

  • ExecMaster each DAG has a single master of that job
  • Planner creates a dag of tasks
  • TaskRunner runs a single node of a set of tasks, communicates between child tasks
  • Datasource supplies data to a task

Coercion

| Go Types | Value types |

| int(8,16,32,64) | IntValue | | float(32,64) | NumberValue | | string | StringValue | | []string | StringsValue | | boolean | BoolValue | | map[string]int | MapStringIntValue |

From | ToInt | ToString | ToBool | ToNumber | MapInt | MapString

| int(8,16,32,64) | y | y | y | y | N | uint(8,16,32,64) | y | y | y | y | N

Documentation

Index

Constants

View Source
const (
	ItemDefaultChannelSize = 50
)
View Source
const (
	MysqlTimeFormat = "2006-01-02 15:04:05.000000000"
)

Variables

View Source
var (
	ShuttingDownError = fmt.Errorf("Received Shutdown Signal")
)

Functions

func RegisterSqlDriver

func RegisterSqlDriver()

func RunJob

func RunJob(conf *datasource.RuntimeConfig, tasks Tasks) error

Run a Sql Job, by running to completion each task

func SetupTasks

func SetupTasks(tasks Tasks) error

Types

type Context

type Context struct {
	DisableRecover bool
	// contains filtered or unexported fields
}

func (*Context) Recover

func (m *Context) Recover()

type ErrChan

type ErrChan chan error

type JobBuilder

type JobBuilder struct {
	// contains filtered or unexported fields
}

This is a simple, single source Job Executor

we can create smarter ones but this is a basic implementation

func NewJobBuilder

func NewJobBuilder(rtConf *datasource.RuntimeConfig, connInfo string) *JobBuilder

JobBuilder

@connInfo = connection string info for original connection

func (*JobBuilder) VisitDelete

func (m *JobBuilder) VisitDelete(stmt *expr.SqlDelete) (interface{}, error)

func (*JobBuilder) VisitDescribe

func (m *JobBuilder) VisitDescribe(stmt *expr.SqlDescribe) (interface{}, error)

func (*JobBuilder) VisitInsert

func (m *JobBuilder) VisitInsert(stmt *expr.SqlInsert) (interface{}, error)

func (*JobBuilder) VisitJoin

func (m *JobBuilder) VisitJoin(stmt *expr.SqlSource) (interface{}, error)

func (*JobBuilder) VisitPreparedStmt

func (m *JobBuilder) VisitPreparedStmt(stmt *expr.PreparedStatement) (interface{}, error)

func (*JobBuilder) VisitSelect

func (m *JobBuilder) VisitSelect(stmt *expr.SqlSelect) (interface{}, error)

func (*JobBuilder) VisitShow

func (m *JobBuilder) VisitShow(stmt *expr.SqlShow) (interface{}, error)

func (*JobBuilder) VisitSubselect

func (m *JobBuilder) VisitSubselect(stmt *expr.SqlSource) (interface{}, error)

func (*JobBuilder) VisitUpdate

func (m *JobBuilder) VisitUpdate(stmt *expr.SqlUpdate) (interface{}, error)

func (*JobBuilder) VisitUpsert

func (m *JobBuilder) VisitUpsert(stmt *expr.SqlUpsert) (interface{}, error)

type JobRunner

type JobRunner interface {
	Setup() error
	Run() error
	Close() error
}

Job Runner is the main RunTime interface for running a SQL Job

type MessageChan

type MessageChan chan datasource.Message

type MessageHandler

type MessageHandler func(ctx *Context, msg datasource.Message) bool

func MakeHandler

func MakeHandler(task TaskRunner) MessageHandler

type Projection

type Projection struct {
	*TaskBase
	// contains filtered or unexported fields
}

func NewProjection

func NewProjection(sqlSelect *expr.SqlSelect) *Projection

type ResultBuffer

type ResultBuffer struct {
	*TaskBase
	// contains filtered or unexported fields
}

func NewResultBuffer

func NewResultBuffer(writeTo *[]datasource.Message) *ResultBuffer

func (*ResultBuffer) Close

func (m *ResultBuffer) Close() error

func (*ResultBuffer) Copy

func (m *ResultBuffer) Copy() *ResultBuffer

type ResultWriter

type ResultWriter struct {
	*TaskBase
	// contains filtered or unexported fields
}

func NewResultRows

func NewResultRows(cols []string) *ResultWriter

func NewResultWriter

func NewResultWriter() *ResultWriter

func (*ResultWriter) Close

func (m *ResultWriter) Close() error

func (*ResultWriter) Columns

func (m *ResultWriter) Columns() []string

func (*ResultWriter) Copy

func (m *ResultWriter) Copy() *ResultWriter

func (*ResultWriter) Next

func (m *ResultWriter) Next(dest []driver.Value) error

Note, this is implementation of the sql/driver Rows() Next() interface

func (*ResultWriter) Run

func (m *ResultWriter) Run(ctx *Context) error

For ResultWriter, since we are are not paging through messages

using this mesage channel, instead using Next() as defined by sql/driver
we don't read the input channel, just watch stop channels

type SigChan

type SigChan chan bool

type Source

type Source struct {
	*TaskBase
	// contains filtered or unexported fields
}

Scan a data source for rows, feed into runner. The source scanner being

 a source is iter.Next() messages instead of sending them on input channel

1) table      -- FROM table
2) channels   -- FROM stream
3) join       -- SELECT t1.name, t2.salary
                     FROM employee AS t1
                     INNER JOIN info AS t2
                     ON t1.name = t2.name;
4) sub-select -- SELECT * FROM (SELECT 1, 2, 3) AS t1;

func NewSource

func NewSource(from *expr.SqlSource, source datasource.Scanner) *Source

A scanner to read from data source

func (*Source) Close

func (m *Source) Close() error

func (*Source) Copy

func (m *Source) Copy() *Source

func (*Source) Run

func (m *Source) Run(context *Context) error

type SourceJoin

type SourceJoin struct {
	*TaskBase
	// contains filtered or unexported fields
}

Scan a data source for rows, feed into runner for join sources

  1. join SELECT t1.name, t2.salary FROM employee AS t1 INNER JOIN info AS t2 ON t1.name = t2.name;

func NewSourceJoin

func NewSourceJoin(builder expr.SubVisitor, leftFrom, rightFrom *expr.SqlSource, conf *datasource.RuntimeConfig) (*SourceJoin, error)

A scanner to read from data source

func (*SourceJoin) Close

func (m *SourceJoin) Close() error

func (*SourceJoin) Copy

func (m *SourceJoin) Copy() *Source

func (*SourceJoin) Run

func (m *SourceJoin) Run(context *Context) error

type SourcePlan

type SourcePlan struct {
	SqlSource *expr.SqlSource
}

func NewSourcePlan

func NewSourcePlan(sql *expr.SqlSource) *SourcePlan

func (*SourcePlan) Accept

func (m *SourcePlan) Accept(sub expr.SubVisitor) (interface{}, error)

func (*SourcePlan) VisitJoin

func (m *SourcePlan) VisitJoin(stmt *expr.SqlSource) (interface{}, error)

func (*SourcePlan) VisitSubselect

func (m *SourcePlan) VisitSubselect(stmt *expr.SqlSource) (interface{}, error)

type SqlJob

type SqlJob struct {
	Tasks Tasks
	Stmt  expr.SqlStatement
	Conf  *datasource.RuntimeConfig
}

SqlJob is dag of tasks for sql execution

func BuildSqlJob

func BuildSqlJob(conf *datasource.RuntimeConfig, connInfo, sqlText string) (*SqlJob, error)

Create Job made up of sub-tasks in DAG that is the

plan for execution of this query/job

func (*SqlJob) Close

func (m *SqlJob) Close() error

func (*SqlJob) DrainChan

func (m *SqlJob) DrainChan() MessageChan

The drain is the last out channel, on last task

func (*SqlJob) Run

func (m *SqlJob) Run() error

func (*SqlJob) Setup

func (m *SqlJob) Setup() error

type TaskBase

type TaskBase struct {
	TaskType string
	Handler  MessageHandler
	// contains filtered or unexported fields
}

func NewTaskBase

func NewTaskBase(taskType string) *TaskBase

func (*TaskBase) Children

func (m *TaskBase) Children() Tasks

func (*TaskBase) Close

func (m *TaskBase) Close() error

func (*TaskBase) ErrChan

func (m *TaskBase) ErrChan() ErrChan

func (*TaskBase) MessageIn

func (m *TaskBase) MessageIn() MessageChan

func (*TaskBase) MessageInSet

func (m *TaskBase) MessageInSet(ch MessageChan)

func (*TaskBase) MessageOut

func (m *TaskBase) MessageOut() MessageChan

func (*TaskBase) MessageOutSet

func (m *TaskBase) MessageOutSet(ch MessageChan)

func (*TaskBase) Run

func (m *TaskBase) Run(ctx *Context) error

func (*TaskBase) SigChan

func (m *TaskBase) SigChan() SigChan

func (*TaskBase) Type

func (m *TaskBase) Type() string

type TaskRunner

type TaskRunner interface {
	Children() Tasks
	Type() string
	MessageIn() MessageChan
	MessageOut() MessageChan
	MessageInSet(MessageChan)
	MessageOutSet(MessageChan)
	ErrChan() ErrChan
	SigChan() SigChan
	Run(ctx *Context) error
	Close() error
}

TaskRunner is an interface for single dependent task in Dag of

Tasks necessary to execute a Job

- it may have children tasks - it may be parallel, distributed, etc

type TaskStepper

type TaskStepper struct {
	*TaskBase
}

On Task stepper we don't Run it, rather use a

Next() explicit call from end user

func NewTaskStepper

func NewTaskStepper(taskType string) *TaskStepper

func (*TaskStepper) Run

func (m *TaskStepper) Run(ctx *Context) error

type Tasks

type Tasks []TaskRunner

func (*Tasks) Add

func (m *Tasks) Add(task TaskRunner)

Add a child Task

type Visitor

type Visitor interface {
	VisitScan(v interface{}) (interface{}, error)
}

exec.Visitor defines standard Sql Visit() pattern to create

a job plan from sql statements

An implementation of Visitor() will be be able to execute/run a Statement

  • inproc: ie, in process
  • distributed: ie, run this job across multiple servers

type Where

type Where struct {
	*TaskBase
	// contains filtered or unexported fields
}

A scanner to filter by where clause

func NewWhere

func NewWhere(where expr.Node, stmt *expr.SqlSelect) *Where

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL