gotlin

package module
v0.0.0-...-74d777d Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: May 21, 2022 License: Apache-2.0 Imports: 30 Imported by: 0

README

Gotlin

Build Status codecov Go Report Card GoDoc

Gotlin is a distributed computing framework, which is similar to Apache MapReduce. It is a lightweight and customizable computing framework specially designed for Golang. It is divided into service nodes, computing nodes and clients. The client submits tasks to the service nodes, and the tasks are divided into multiple instructions and form a directed acyclic graph according to the dependencies of the instructions. The service node will schedule the instructions to the computing node, and when all the instructions are calculated, the service node will return the result of the task to the client. At this point, a task ends. The gotlin framework has a rich set of built-in instructions, including basic arithmetic instructions, logical instructions, data instructions [immediate values, database input and output], collection instructions [intersection, union, difference], table instructions [join, union, filter, grouping, sorting]. You can also customize your own instruction, register the computing node that can process the instruction to the service node, and the instruction can participate in the calculation. Hope you enjoy it.

*Important: The core functions of gotlin1.0rc1 have been completed. After confirming the function signature and improving the unit test coverage, the stable version will be launched.

Installation

Use go get.

$ go get -u github.com/yaoguais/gotlin

Quick start

Start gotlin server.

$ gotlin start

Start a compute node with a built-in instruction set.

$ gotlin compute

Submit a task "(1+2)*4" to the service node.

$ gotlin submit --program="@program.json"

Output: Program evaluates to 12

Contents

Architecture

Gotlin Architecture Diagram

Benchmark and display metrics using Prometheus and visualize using grafana

First compile the executable.

CGO_ENABLED=0 GOOS=linux GOARCH=amd64 go build -o gotlin cmd/gotlin/main.go 

Then use docker-compose to compile the image and run.

docker-compose build --no-cache
docker-compose up -d

Of course you need to install docker-compose.

Then you can log in to the grafana web ui to view the system metrics. The login account is admin/admin.

Open from browser http://localhost:3000/d/5LryXfunz/gotlin-monitoring

Due to the limitation of docker-compose, if there is currently only one Executor, run the following command to increase the number of Executors to 8.

docker-compose --compatibility up -d

Finally, submit some computing tasks to the service node and check the changes of the indicators.

docker-compose exec client gotlin submit --server="gotlin:9527" --program="@program.json" --fork=1000 --concurrency=8

The content of the monitoring display is as follows.

Gotlin Monitor

You can do your own benchmarks by referring to the above operations. Best of all, always do your own benchmarks.

Submit a task to the service node

func main() {
	// Perform an arithmetic calculation "( 1 + 2 ) * 4", the expected result is 12
	i1 := NewInstruction().ChangeImmediateValue(1)
	i2 := NewInstruction().ChangeImmediateValue(2)
	i3 := NewInstruction().ChangeToArithmetic(OpCodeAdd)
	i4 := NewInstruction().ChangeImmediateValue(4)
	i5 := NewInstruction().ChangeToArithmetic(OpCodeMul)
	ins := []Instructioner{i1, i2, i3, i4, i5}

	p := NewProgram()
	for _, in := range ins {
		p = p.AddInstruction(in.Instruction().ID)
	}

	d := NewInstructionDAG()
	ids := []InstructionID{}
	for _, v := range ins {
		ids = append(ids, v.Instruction().ID)
	}
	d.Add(ids...)
	d.AttachChildren(i3.ID, i1.ID, i2.ID)
	d.AttachChildren(i5.ID, i3.ID, i4.ID)

	core := 8
	p = p.ChangeProcessor(NewDAGProcessorContext(d, core))

	c, _ := NewClient()
	s, _ := c.RequestScheduler(context.Background(), RequestSchedulerOption{})
	rp := RunProgramOption{SchedulerID: s, Program: p, Instructions: ins}
	c.RunProgram(context.Background(), rp)

	ch, _ := c.WaitResult(context.Background(), WaitResultOption{IDs: []ProgramID{p.ID}})
	wr := <-ch
	fmt.Printf("Program: %s, result %v\n", wr.ID, wr.Result)
}

Design a compute node with a custom instruction set

func main() {
	instructionSet := NewInstructionSet()
	customInstructionHandler := InstructionHandler{
		OpCode: OpCode("RETURN9527"),
		Executor: func(ctx context.Context, op Instruction,
			args ...Instruction) (InstructionResult, error) {
			return NewRegisterResult(9527), nil
		},
	}
	instructionSet.Register(customInstructionHandler)

	c, _ := NewClient(WithClientInstructionSet(instructionSet))
	c.RegisterExecutor(context.Background(), RegisterExecutorOption{
		ID:     NewExecutorID(),
		Labels: NewLabels(OpCodeLabelKey, "RETURN9527"),
	})
	_ = c.StartComputeNode(context.Background(), StartComputeNodeOption{})
}

Intersect the results of the query in the database

func main() {
	// Compute the intersection of collection ['C1','C3'] and ['C2','C3'], the desired result is ['C3']
	// CREATE TABLE IF NOT EXISTS test_collections(
	//   id int(10) PRIMARY KEY, name varchar(50) NOT NULL, score decimal(8,2) NOT NULL);
	// INSERT INTO test_collections VALUES(1, 'C1', 0.2), (2, 'C2', 0.2);
	// INSERT INTO test_collections VALUES(3, 'C3', 1.2), (4, 'C3', 2.4);
	driver := "mysql"
	dsn := "root:root@tcp(127.0.0.1:3306)/gotlin?charset=utf8mb4&parseTime=True&loc=Local"
	query1 := "select name from test_collections where id IN (1, 3)"
	query2 := "select name from test_collections where id IN (2, 4)"
	converters := []QueryConverter{QueryConverterFlat}
	d1 := NewDatabaseInput(driver, dsn, query1, converters)
	i1 := NewInstruction().ChangeDatabaseInput(d1)
	d2 := NewDatabaseInput(driver, dsn, query2, converters)
	i2 := NewInstruction().ChangeDatabaseInput(d2)
	i3 := NewInstruction().ChangeToArithmetic(OpCodeIntersect)
	ins := []Instructioner{i1, i2, i3}

	p := NewProgram()
	for _, in := range ins {
		p = p.AddInstruction(in.Instruction().ID)
	}

	d := NewInstructionDAG()
	ids := []InstructionID{}
	for _, v := range ins {
		ids = append(ids, v.Instruction().ID)
	}
	d.Add(ids...)
	d.AttachChildren(i3.ID, i1.ID, i2.ID)

	core := 8
	p = p.ChangeProcessor(NewDAGProcessorContext(d, core))

	c, _ := NewClient()
	s, _ := c.RequestScheduler(context.Background(), RequestSchedulerOption{})
	rp := RunProgramOption{SchedulerID: s, Program: p, Instructions: ins}
	c.RunProgram(context.Background(), rp)

	ch, _ := c.WaitResult(context.Background(), WaitResultOption{IDs: []ProgramID{p.ID}})
	wr := <-ch
	fmt.Printf("Program: %s, result %v\n", wr.ID, wr.Result)
}

Use as an embedded library and an in-memory database

func main() {
	// Perform an arithmetic calculation "( 1 + 2 ) * ( 5 - 1 )", the expected result is 12
	g, _ := NewGotlin(WithServerExecutor(true), WithEnableServer(false))

	i1 := NewInstruction().ChangeImmediateValue(1)
	i2 := NewInstruction().ChangeImmediateValue(2)
	i3 := NewInstruction().ChangeToArithmetic(OpCodeAdd)
	i4 := NewInstruction().ChangeImmediateValue(5)
	i5 := NewInstruction().ChangeImmediateValue(1)
	i6 := NewInstruction().ChangeToArithmetic(OpCodeSub)
	i7 := NewInstruction().ChangeToArithmetic(OpCodeMul)
	ins := []Instructioner{i1, i2, i3, i4, i5, i6, i7}

	p := NewProgram()
	for _, in := range ins {
		p = p.AddInstruction(in.Instruction().ID)
	}

	d := NewInstructionDAG()
	ids := []InstructionID{}
	for _, v := range ins {
		ids = append(ids, v.Instruction().ID)
	}
	d.Add(ids...)
	d.AttachChildren(i3.ID, i1.ID, i2.ID)
	d.AttachChildren(i6.ID, i4.ID, i5.ID)
	d.AttachChildren(i7.ID, i3.ID, i6.ID)

	p = p.ChangeProcessor(NewDAGProcessorContext(d, 8))
	p, _ = p.ChangeState(StateReady)
	s, _ := g.RequestScheduler(context.Background(), NewSchedulerOption())
	result, _ := g.RunProgramSync(context.Background(), s, p, ins)
	fmt.Printf("Program: %s, result %v\n", p.ID, result)
}

License

Copyright 2013 Mir Ikram Uddin

Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at

   http://www.apache.org/licenses/LICENSE-2.0

Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.

Thanks to contributors

@seboghpub @sebogh

Documentation

Index

Constants

View Source
const EmptyHost = Host("")
View Source
const OpCodeLabelKey = "opcode"
View Source
const TimestampZero = Timestamp(0)

Variables

View Source
var (
	ErrFind       = newError("Find")
	ErrNotFound   = newError("Not found")
	ErrConflict   = newError("Conflict")
	ErrPersistent = newError("Persistent")
)

Repository Errors

View Source
var (
	ErrConnect          = newError("Connect")
	ErrRequest          = newError("Request")
	ErrExitUnexpectedly = newError("Exit unexpectedly")
	ErrConverter        = newError("Converter")
	ErrNilPointer       = newError("NilPointer")
)

Client Errors

View Source
var (
	ErrResponse = newError("Response")
	ErrReceive  = newError("Receive")
	ErrSend     = newError("Send")
)

Service Errors

View Source
var (
	ErrRegisterResult   = newError("Only register result is supported for arithmetic instructions")
	ErrArithmeticOpCode = newError("Only ADD/SUB/MUL/DIV are supported for arithmetic opcode")
)

InstructionSet Errors

View Source
var (
	ErrProcessorType         = newError("Processor type is invalid")
	ErrProgramState          = newError("Program state is invalid")
	ErrProgramCode           = newError("Program code is invalid")
	ErrProgramResult         = newError("Program result is invalid")
	ErrInstructionState      = newError("Instruction state is invalid")
	ErrInstructions          = newError("Instructions is invalid")
	ErrSchedulerDuplicated   = newError("Scheduler duplicated")
	ErrProgramDuplicated     = newError("Program duplicated")
	ErrInstructionDuplicated = newError("Instruction duplicated")
)

Model Errors

View Source
var (
	WaitInstructionHandler = InstructionHandler{
		OpCode:   OpCodeWait,
		Executor: ExecuteWaitInstruction,
	}
	MoveInstructionHandler = InstructionHandler{
		OpCode:   OpCodeMove,
		Executor: ExecuteMoveInstruction,
	}
	InputInstructionHandler = InstructionHandler{
		OpCode:   OpCodeIn,
		Executor: ExecuteInputInstruction,
	}
	AddInstructionHandler = InstructionHandler{
		OpCode:   OpCodeAdd,
		Executor: ExecuteArithmeticInstruction,
	}
	SubInstructionHandler = InstructionHandler{
		OpCode:   OpCodeSub,
		Executor: ExecuteArithmeticInstruction,
	}
	MulInstructionHandler = InstructionHandler{
		OpCode:   OpCodeMul,
		Executor: ExecuteArithmeticInstruction,
	}
	DivInstructionHandler = InstructionHandler{
		OpCode:   OpCodeDiv,
		Executor: ExecuteArithmeticInstruction,
	}
	IntersectInstructionHandler = InstructionHandler{
		OpCode:   OpCodeIntersect,
		Executor: ExecuteCollectionInstruction,
	}
	UnionInstructionHandler = InstructionHandler{
		OpCode:   OpCodeUnion,
		Executor: ExecuteCollectionInstruction,
	}
	DiffInstructionHandler = InstructionHandler{
		OpCode:   OpCodeDiff,
		Executor: ExecuteCollectionInstruction,
	}
)
View Source
var (
	InstructionTableName = "instructions"
	ProgramTableName     = "programs"
	SchedulerTableName   = "schedulers"
	ExecutorTableName    = "executors"
)
View Source
var DatabaseFactory = func(driver, dsn string) (*gorm.DB, error) {
	switch strings.ToLower(driver) {
	case "mysql":
		return gorm.Open(mysql.Open(dsn), &gorm.Config{})
	case "postgres":
		return gorm.Open(postgres.Open(dsn), &gorm.Config{})
	case "clickhouse":
		return gorm.Open(clickhouse.Open(dsn), &gorm.Config{})
	default:
		return nil, newErrorf("Database driver %s not supported", driver)
	}
}
View Source
var EmptyQueryResult = struct{}{}
View Source
var EnvPrefix = "GOTLIN_"
View Source
var (
	ErrExecutorNotFound = newError("Executor not found")
)

Executor Errors

View Source
var (
	ErrNoMoreInstruction = newError("No more instruction found")
)

Processor Errors

View Source
var (
	ErrUndoubted = newError("Undoubted")
)

Common Errors

View Source
var (
	MarshalStructs = []interface{}{
		Operand{},
		EmptyInput{},
		Immediate{},
		DatabaseInput{},
		InstructionResult{},
		EmptyResult{},
		RegisterResult{},
		[]interface{}{},
	}
)
View Source
var ReadWriteInstructions = map[OpCode]bool{
	OpCodeIn:   true,
	OpCodeMove: true,
}

Functions

func IsReadWriteInstruction

func IsReadWriteInstruction(v Instruction) bool

func SetLogger

func SetLogger(l *logrus.Logger)

func SetMarshalType

func SetMarshalType(v MarshalType)

Types

type Client

type Client struct {
	GRPCOption     []grpc.DialOption
	TargetAddress  string
	InstructionSet *InstructionSet
	// contains filtered or unexported fields
}

func NewClient

func NewClient(options ...ClientOption) (*Client, error)

func (*Client) RegisterExecutor

func (c *Client) RegisterExecutor(ctx context.Context, r RegisterExecutorOption) error

func (*Client) RequestScheduler

func (c *Client) RequestScheduler(ctx context.Context, r RequestSchedulerOption) (SchedulerID, error)

func (*Client) RunProgram

func (c *Client) RunProgram(ctx context.Context, r RunProgramOption) error

func (*Client) Shutdown

func (c *Client) Shutdown() error

func (*Client) StartComputeNode

func (c *Client) StartComputeNode(ctx context.Context, r StartComputeNodeOption) error

func (*Client) UnregisterExecutor

func (c *Client) UnregisterExecutor(ctx context.Context, r UnregisterExecutorOption) error

func (*Client) WaitResult

func (c *Client) WaitResult(ctx context.Context, r WaitResultOption) (chan ProgramResult, error)

type ClientID

type ClientID = ID

func NewClientID

func NewClientID() ClientID

type ClientOption

type ClientOption func(*Client)

func WithClientGRPCOptions

func WithClientGRPCOptions(options ...grpc.DialOption) ClientOption

func WithClientInstructionSet

func WithClientInstructionSet(is *InstructionSet) ClientOption

func WithClientTargetAddress

func WithClientTargetAddress(addr string) ClientOption

type ControlUnitType

type ControlUnitType string
const (
	ControlUnitTypePC  ControlUnitType = "ProgramCounter"
	ControlUnitTypeDAG ControlUnitType = "DAG"
)

type DatabaseInput

type DatabaseInput struct {
	Driver     string
	DSN        string
	Query      string
	Converters []QueryConverter
}

func NewDatabaseInput

func NewDatabaseInput(driver, dsn, query string, converters []QueryConverter) DatabaseInput

func (DatabaseInput) OperandValue

func (v DatabaseInput) OperandValue(ctx context.Context) (interface{}, error)

func (DatabaseInput) Type

func (v DatabaseInput) Type() string

type DatabasePool

type DatabasePool struct {
	// contains filtered or unexported fields
}

func NewDatabasePool

func NewDatabasePool() DatabasePool

func (*DatabasePool) Get

func (p *DatabasePool) Get(driver, dsn string) (*gorm.DB, error)

type EmptyInput

type EmptyInput struct {
}

func (EmptyInput) OperandValue

func (EmptyInput) OperandValue(context.Context) (interface{}, error)

func (EmptyInput) Type

func (EmptyInput) Type() string

type EmptyResult

type EmptyResult struct {
}

func (EmptyResult) InstructionResult

func (EmptyResult) InstructionResult(context.Context) (interface{}, error)

func (EmptyResult) Type

func (EmptyResult) Type() string

type ExecuteID

type ExecuteID = ID

func NewExecuteID

func NewExecuteID() ExecuteID

func ParseExecuteID

func ParseExecuteID(s string) (ExecuteID, error)

type Executor

type Executor struct {
	ID         ExecutorID
	Labels     Labels
	Host       Host
	State      State
	Error      error
	Limits     Resource
	Usages     Resource
	CreateTime Timestamp
	UpdateTime Timestamp
	FinishTime Timestamp
}

func NewExecutor

func NewExecutor() Executor

func (Executor) AddLabel

func (m Executor) AddLabel(label Label) Executor

func (Executor) ChangeState

func (m Executor) ChangeState(state State) (Executor, bool)

func (Executor) ExitOnError

func (m Executor) ExitOnError(err error) Executor

func (Executor) IsEmptyHost

func (m Executor) IsEmptyHost() bool

func (Executor) IsState

func (m Executor) IsState(state State) bool

type ExecutorID

type ExecutorID = ID

func NewExecutorID

func NewExecutorID() ExecutorID

func ParseExecutorID

func ParseExecutorID(s string) (ExecutorID, error)

type ExecutorLoadBalancer

type ExecutorLoadBalancer interface {
	Next(ctx context.Context, l []Executor) (Executor, error)
}

type ExecutorPool

type ExecutorPool struct {
	ExecutorRepository ExecutorRepository
	// contains filtered or unexported fields
}

func NewExecutorPool

func NewExecutorPool(er ExecutorRepository, is *InstructionSet) *ExecutorPool

func (*ExecutorPool) Add

func (m *ExecutorPool) Add(ctx context.Context, executor Executor) (err error)

func (*ExecutorPool) AddServerExecutor

func (m *ExecutorPool) AddServerExecutor() error

func (*ExecutorPool) Attach

func (m *ExecutorPool) Attach(c *executor) error

func (*ExecutorPool) Detach

func (m *ExecutorPool) Detach(c *executor) error

func (*ExecutorPool) Execute

func (m *ExecutorPool) Execute(ctx context.Context, op Instruction, args ...Instruction) (ir InstructionResult, err error)

func (*ExecutorPool) FindByHost

func (m *ExecutorPool) FindByHost(ctx context.Context, host Host) (ExecutorID, error)

func (*ExecutorPool) GetExecutor

func (m *ExecutorPool) GetExecutor(ctx context.Context, op OpCode) (e Executor, eh ExecutorHandler, err error)

func (*ExecutorPool) Remove

func (m *ExecutorPool) Remove(ctx context.Context, id ExecutorID, removeErr error) (err error)

type ExecutorRepository

type ExecutorRepository interface {
	Find(context.Context, ExecutorID) (Executor, error)
	Save(context.Context, *Executor) error
}

type Gotlin

type Gotlin struct {
	SchedulerRepository   SchedulerRepository
	ProgramRepository     ProgramRepository
	InstructionRepository InstructionRepository
	ExecutorRepository    ExecutorRepository
	EnableServer          bool
	ServerAddress         string
	ServerExecutor        bool
	GRPCOption            []grpc.ServerOption
	// contains filtered or unexported fields
}

func NewGotlin

func NewGotlin(options ...Option) (*Gotlin, error)

func (*Gotlin) QueryResult

func (g *Gotlin) QueryResult(ctx context.Context, p Program) (interface{}, error)

func (*Gotlin) RequestScheduler

func (g *Gotlin) RequestScheduler(ctx context.Context, option SchedulerOption) (SchedulerID, error)

func (*Gotlin) RunProgram

func (g *Gotlin) RunProgram(ctx context.Context, s SchedulerID, p Program, ins []Instructioner) error

func (*Gotlin) RunProgramSync

func (g *Gotlin) RunProgramSync(ctx context.Context, s SchedulerID, p Program, ins []Instructioner) (interface{}, error)

func (*Gotlin) StartServer

func (g *Gotlin) StartServer(ctx context.Context) (err error)

func (*Gotlin) StopServer

func (g *Gotlin) StopServer(graceful bool) (err error)

func (*Gotlin) WaitResult

func (g *Gotlin) WaitResult(ctx context.Context, ids []ProgramID) (chan ProgramResult, error)

type Host

type Host string

func (Host) IsEqual

func (v Host) IsEqual(v2 Host) bool

func (Host) String

func (v Host) String() string

type ID

type ID struct {
	// contains filtered or unexported fields
}

func NewID

func NewID() ID

func ParseID

func ParseID(s string) (ID, error)

func (ID) IsEqual

func (v ID) IsEqual(v2 ID) bool

func (ID) IsValid

func (v ID) IsValid() bool

func (ID) MarshalJSON

func (v ID) MarshalJSON() ([]byte, error)

func (ID) NonceString

func (v ID) NonceString() string

func (ID) String

func (v ID) String() string

func (*ID) UnmarshalJSON

func (v *ID) UnmarshalJSON(data []byte) error

type Immediate

type Immediate struct {
	Value interface{}
}

func (Immediate) OperandValue

func (v Immediate) OperandValue(context.Context) (interface{}, error)

func (Immediate) Type

func (Immediate) Type() string

type Instruction

type Instruction struct {
	ID         InstructionID
	OpCode     OpCode
	Operand    Operand
	Result     InstructionResult
	State      State
	Error      error
	CreateTime Timestamp
	UpdateTime Timestamp
	FinishTime Timestamp
}

func NewInstruction

func NewInstruction() Instruction

func (Instruction) ChangeDatabaseInput

func (m Instruction) ChangeDatabaseInput(v DatabaseInput) Instruction

func (Instruction) ChangeImmediateValue

func (m Instruction) ChangeImmediateValue(value interface{}) Instruction

func (Instruction) ChangeState

func (m Instruction) ChangeState(state State) (Instruction, bool)

func (Instruction) ChangeToArithmetic

func (m Instruction) ChangeToArithmetic(op OpCode) Instruction

func (Instruction) ChangeToWait

func (m Instruction) ChangeToWait(value interface{}) Instruction

func (Instruction) Finish

func (m Instruction) Finish(result InstructionResult, err error) Instruction

func (Instruction) Instruction

func (m Instruction) Instruction() Instruction

func (Instruction) InstructionResult

func (m Instruction) InstructionResult(ctx context.Context) (interface{}, error)

func (Instruction) IsState

func (m Instruction) IsState(state State) bool

func (Instruction) OperandValue

func (m Instruction) OperandValue(ctx context.Context) (interface{}, error)

func (Instruction) Reready

func (m Instruction) Reready() Instruction

type InstructionDAG

type InstructionDAG struct {
	// contains filtered or unexported fields
}

func NewInstructionDAG

func NewInstructionDAG() InstructionDAG

func ParseInstructionDAG

func ParseInstructionDAG(s string) (InstructionDAG, error)

func (InstructionDAG) Add

func (d InstructionDAG) Add(ins ...InstructionID) error

func (InstructionDAG) Ancestors

func (d InstructionDAG) Ancestors() []InstructionID

func (InstructionDAG) AttachChildren

func (d InstructionDAG) AttachChildren(parent InstructionID, children ...InstructionID) error

func (InstructionDAG) Children

func (d InstructionDAG) Children(parent InstructionID) ([]InstructionID, error)

func (InstructionDAG) Iterator

func (d InstructionDAG) Iterator(ctx context.Context) chan InstructionID

func (InstructionDAG) MarshalString

func (d InstructionDAG) MarshalString() string

type InstructionEdge

type InstructionEdge struct {
	SrcID string `json:"s"`
	DstID string `json:"d"`
}

func (InstructionEdge) Edge

func (e InstructionEdge) Edge() (srcID, dstID string)

type InstructionHandler

type InstructionHandler struct {
	OpCode   OpCode
	Executor ExecutorHandler
}

type InstructionID

type InstructionID = ID

func NewInstructionID

func NewInstructionID() InstructionID

func ParseInstructionID

func ParseInstructionID(s string) (InstructionID, error)

type InstructionRef

type InstructionRef struct {
	// contains filtered or unexported fields
}

func NewInstructionRef

func NewInstructionRef(in Instruction) InstructionRef

func (InstructionRef) Instruction

func (m InstructionRef) Instruction() Instruction

func (InstructionRef) IsRef

func (m InstructionRef) IsRef()

type InstructionRefer

type InstructionRefer interface {
	Instructioner
	IsRef()
}

type InstructionRepository

type InstructionRepository interface {
	Find(context.Context, InstructionID) (Instruction, error)
	Save(context.Context, *Instruction) error
}

type InstructionResult

type InstructionResult struct {
	Type   string
	Result InstructionResulter
}

func ExecuteArithmeticInstruction

func ExecuteArithmeticInstruction(ctx context.Context, op Instruction, args ...Instruction) (InstructionResult, error)

func ExecuteCollectionInstruction

func ExecuteCollectionInstruction(ctx context.Context, op Instruction, args ...Instruction) (InstructionResult, error)

func ExecuteInputInstruction

func ExecuteInputInstruction(ctx context.Context, op Instruction, args ...Instruction) (InstructionResult, error)

func ExecuteMoveInstruction

func ExecuteMoveInstruction(ctx context.Context, op Instruction, args ...Instruction) (InstructionResult, error)

func ExecuteWaitInstruction

func ExecuteWaitInstruction(ctx context.Context, op Instruction, args ...Instruction) (InstructionResult, error)

func NewEmptyInstructionResult

func NewEmptyInstructionResult() InstructionResult

func NewRegisterResult

func NewRegisterResult(u interface{}) InstructionResult

func (InstructionResult) InstructionResult

func (v InstructionResult) InstructionResult(ctx context.Context) (interface{}, error)

func (InstructionResult) RegisterValue

func (v InstructionResult) RegisterValue() (interface{}, bool)

func (*InstructionResult) UnmarshalJSON

func (v *InstructionResult) UnmarshalJSON(data []byte) (err error)

func (*InstructionResult) UnmarshalMsgpack

func (v *InstructionResult) UnmarshalMsgpack(data []byte) error

type InstructionResulter

type InstructionResulter interface {
	Type() string
	InstructionResult(context.Context) (interface{}, error)
}

type InstructionSet

type InstructionSet struct {
	// contains filtered or unexported fields
}

func NewInstructionSet

func NewInstructionSet() *InstructionSet

func (*InstructionSet) ClearDefaults

func (m *InstructionSet) ClearDefaults()

func (*InstructionSet) GetExecutorHandler

func (m *InstructionSet) GetExecutorHandler(op OpCode) (ExecutorHandler, error)

func (*InstructionSet) OpCodeLabel

func (m *InstructionSet) OpCodeLabel() Label

func (*InstructionSet) Register

func (m *InstructionSet) Register(handler InstructionHandler) error

func (*InstructionSet) Unregister

func (m *InstructionSet) Unregister(handler InstructionHandler) error

type InstructionVertex

type InstructionVertex struct {
	WrappedID     string        `json:"i"`
	InstructionID InstructionID `json:"v"`
}

func NewInstructionVertex

func NewInstructionVertex(id string, in InstructionID) InstructionVertex

func (InstructionVertex) ID

func (v InstructionVertex) ID() string

func (InstructionVertex) Vertex

func (v InstructionVertex) Vertex() (id string, value interface{})

type Instructioner

type Instructioner interface {
	Instruction() Instruction
}

type Label

type Label struct {
	Key   string
	Value string
}

func NewLabel

func NewLabel(k, v string) Label

type Labels

type Labels []Label

func NewLabels

func NewLabels(kv ...string) Labels

func (Labels) Add

func (v Labels) Add(l Label) Labels

func (Labels) ExistOpCode

func (v Labels) ExistOpCode(op OpCode) bool

func (Labels) Find

func (v Labels) Find(key string) (string, bool)

type Logger

type Logger interface {
	Debugf(format string, args ...interface{})
	Infof(format string, args ...interface{})
	Warnf(format string, args ...interface{})
	Errorf(format string, args ...interface{})

	Debug(args ...interface{})
	Info(args ...interface{})
	Print(args ...interface{})
	Warn(args ...interface{})

	Debugln(args ...interface{})
	Infoln(args ...interface{})
	Println(args ...interface{})
	Warnln(args ...interface{})
}

type Map

type Map map[string]interface{}

type Marshal

type Marshal func(v interface{}) ([]byte, error)

type MarshalType

type MarshalType string
var (
	MarshalTypeJSON    MarshalType = "json"
	MarshalTypeMsgPack MarshalType = "msgpack"
	MarshalTypeGob     MarshalType = "gob"
)

type OpCode

type OpCode string
const (
	OpCodeWait      OpCode = "WAIT"
	OpCodeAdd       OpCode = "ADD"
	OpCodeSub       OpCode = "SUB"
	OpCodeMul       OpCode = "MUL"
	OpCodeDiv       OpCode = "DIV"
	OpCodeIn        OpCode = "IN"
	OpCodeMove      OpCode = "MOV"
	OpCodeIntersect OpCode = "INTERSECT"
	OpCodeUnion     OpCode = "UNION"
	OpCodeDiff      OpCode = "DIFF"
)

func ParseOpCode

func ParseOpCode(s string) (OpCode, error)

func (OpCode) String

func (v OpCode) String() string

type Operand

type Operand struct {
	Type  string
	Value OperandValuer
}

func NewDatabaseInputOperand

func NewDatabaseInputOperand(v DatabaseInput) Operand

func NewEmptyOperand

func NewEmptyOperand() Operand

func NewImmediateValue

func NewImmediateValue(u interface{}) Operand

func (*Operand) ImmediateValue

func (v *Operand) ImmediateValue() (interface{}, bool)

func (Operand) OperandValue

func (v Operand) OperandValue(ctx context.Context) (interface{}, error)

func (*Operand) UnmarshalJSON

func (v *Operand) UnmarshalJSON(data []byte) (err error)

func (*Operand) UnmarshalMsgpack

func (v *Operand) UnmarshalMsgpack(data []byte) error

type OperandValuer

type OperandValuer interface {
	Type() string
	OperandValue(context.Context) (interface{}, error)
}

type Option

type Option func(g *Gotlin)

func WithDatabase

func WithDatabase(db *gorm.DB) Option

func WithEnableServer

func WithEnableServer(enable bool) Option

func WithGRPCServerOption

func WithGRPCServerOption(options ...grpc.ServerOption) Option

func WithInstructionSet

func WithInstructionSet(is *InstructionSet) Option

func WithServerAddress

func WithServerAddress(addr string) Option

func WithServerExecutor

func WithServerExecutor(enable bool) Option

type Processor

type Processor interface {
	Process(context.Context, Program) error
}

type ProcessorContext

type ProcessorContext struct {
	ControlUnit ControlUnitType
	Core        int
	Data        string
}

func NewDAGProcessorContext

func NewDAGProcessorContext(d InstructionDAG, core int) ProcessorContext

func NewProcessorContext

func NewProcessorContext() ProcessorContext

func (ProcessorContext) ChangePC

func (ProcessorContext) CurrentPC

func (v ProcessorContext) CurrentPC() (InstructionID, bool)

func (ProcessorContext) IsDAG

func (v ProcessorContext) IsDAG() bool

func (ProcessorContext) IsPC

func (v ProcessorContext) IsPC() bool

type Program

type Program struct {
	ID         ProgramID
	Code       ProgramCode
	State      State
	Error      error
	Processor  ProcessorContext
	CreateTime Timestamp
	UpdateTime Timestamp
	FinishTime Timestamp
}

func NewProgram

func NewProgram() Program

func (Program) AddInstruction

func (m Program) AddInstruction(id InstructionID) Program

func (Program) ChangeProcessor

func (m Program) ChangeProcessor(p ProcessorContext) Program

func (Program) ChangeState

func (m Program) ChangeState(state State) (Program, bool)

func (Program) ExitOnError

func (m Program) ExitOnError(err error) Program

func (Program) IsDAGProcessor

func (m Program) IsDAGProcessor() bool

func (Program) IsPCProcessor

func (m Program) IsPCProcessor() bool

func (Program) IsState

func (m Program) IsState(state State) bool

func (Program) NextPC

func (m Program) NextPC(id InstructionID) Program

func (Program) Reready

func (m Program) Reready() Program

type ProgramCode

type ProgramCode struct {
	Instructions []InstructionID
}

func NewProgramCode

func NewProgramCode() ProgramCode

func (ProgramCode) AddInstruction

func (v ProgramCode) AddInstruction(id InstructionID) ProgramCode

func (ProgramCode) IsEqual

func (v ProgramCode) IsEqual(v2 ProgramCode) bool

type ProgramID

type ProgramID = ID

func NewProgramID

func NewProgramID() ProgramID

func ParseProgramID

func ParseProgramID(s string) (ProgramID, error)

type ProgramRepository

type ProgramRepository interface {
	Find(context.Context, ProgramID) (Program, error)
	Save(context.Context, *Program) error
}

type ProgramResult

type ProgramResult struct {
	ID     ProgramID
	Result interface{}
	Error  error
}

type QueryConverter

type QueryConverter string
const QueryConverterFirstValue QueryConverter = "FirstValue"
const QueryConverterFlat QueryConverter = "Flat"

type RegisterExecutorOption

type RegisterExecutorOption struct {
	ID          ExecutorID
	Host        Host
	Labels      Labels
	CallOptions []grpc.CallOption
}

type RegisterResult

type RegisterResult struct {
	Value interface{}
}

func (RegisterResult) InstructionResult

func (v RegisterResult) InstructionResult(context.Context) (interface{}, error)

func (RegisterResult) Type

func (RegisterResult) Type() string

type RequestSchedulerOption

type RequestSchedulerOption struct {
	SchedulerOption
	CallOptions []grpc.CallOption
}

type Resource

type Resource struct {
	CPU       int64
	Memory    int64
	Disk      int64
	Bandwidth int64
}

func NewEmptyResource

func NewEmptyResource() Resource

type RunProgramOption

type RunProgramOption struct {
	SchedulerID  SchedulerID
	Program      Program
	Instructions []Instructioner
	CallOptions  []grpc.CallOption
}

type ScheduledPrograms

type ScheduledPrograms struct {
	Programs []ProgramID
}

func NewScheduledPrograms

func NewScheduledPrograms() ScheduledPrograms

func (ScheduledPrograms) AddProgram

type Scheduler

type Scheduler struct {
	ID         SchedulerID
	Programs   ScheduledPrograms
	CreateTime Timestamp
	UpdateTime Timestamp
	FinishTime Timestamp
}

func NewScheduler

func NewScheduler() Scheduler

func (Scheduler) AddProgram

func (m Scheduler) AddProgram(id ProgramID) Scheduler

type SchedulerID

type SchedulerID = ID

func NewSchedulerID

func NewSchedulerID() SchedulerID

func ParseSchedulerID

func ParseSchedulerID(s string) (SchedulerID, error)

type SchedulerOption

type SchedulerOption struct {
	// contains filtered or unexported fields
}

func NewSchedulerOption

func NewSchedulerOption() SchedulerOption

type SchedulerPool

type SchedulerPool struct {
	ExecutorPool          *ExecutorPool
	SchedulerRepository   SchedulerRepository
	ProgramRepository     ProgramRepository
	InstructionRepository InstructionRepository
	// contains filtered or unexported fields
}

func (*SchedulerPool) Close

func (sp *SchedulerPool) Close() error

func (*SchedulerPool) QueryResult

func (sp *SchedulerPool) QueryResult(ctx context.Context, p Program) (interface{}, error)

func (*SchedulerPool) RequestScheduler

func (sp *SchedulerPool) RequestScheduler(ctx context.Context, option SchedulerOption) (SchedulerID, error)

func (*SchedulerPool) RunProgram

func (sp *SchedulerPool) RunProgram(ctx context.Context, sid SchedulerID, p Program, ins []Instructioner) error

func (*SchedulerPool) RunProgramSync

func (sp *SchedulerPool) RunProgramSync(ctx context.Context, sid SchedulerID, p Program, ins []Instructioner) (value interface{}, err error)

func (*SchedulerPool) WaitResult

func (sp *SchedulerPool) WaitResult(ctx context.Context, ids []ProgramID) (chan ProgramResult, error)

type SchedulerRepository

type SchedulerRepository interface {
	Find(context.Context, SchedulerID) (Scheduler, error)
	Save(context.Context, *Scheduler) error
}

type StartComputeNodeOption

type StartComputeNodeOption struct {
	CallOptions []grpc.CallOption
}

type State

type State string
const (
	StateNew     State = "New"
	StateReady   State = "Ready"
	StateRunning State = "Running"
	StateBlocked State = "Blocked"
	StateExit    State = "Exit"
)

type StorableDAG

type StorableDAG struct {
	InstructionVertices []InstructionVertex `json:"vs"`
	InstructionEdges    []InstructionEdge   `json:"es"`
}

func (StorableDAG) Edges

func (d StorableDAG) Edges() []dag.Edger

func (StorableDAG) Vertices

func (d StorableDAG) Vertices() []dag.Vertexer

type Timestamp

type Timestamp int64

func NewTimestamp

func NewTimestamp() Timestamp

func ParseTimestamp

func ParseTimestamp(v int64) Timestamp

func (Timestamp) IsEqual

func (v Timestamp) IsEqual(v2 Timestamp) bool

func (Timestamp) Value

func (v Timestamp) Value() int64

type Unmarshal

type Unmarshal func(data []byte, v interface{}) error

type UnregisterExecutorOption

type UnregisterExecutorOption struct {
	ID          ExecutorID
	Error       error
	CallOptions []grpc.CallOption
}

type VisitorFunc

type VisitorFunc func(dag.Vertexer)

func (VisitorFunc) Visit

func (f VisitorFunc) Visit(v dag.Vertexer)

type WaitResultOption

type WaitResultOption struct {
	IDs         []ProgramID
	CallOptions []grpc.CallOption
}

Directories

Path Synopsis
cmd

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL