sql

package module
v0.3.11 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Sep 9, 2021 License: Apache-2.0 Imports: 16 Imported by: 10

README

SQL

基于 fns.Service 实现的内部 SQL 服务,讲 sql 操作服务化,同时支持分布式事务。

安装

go get github.com/aacfactory/fns-contrib/databases/sql@main

使用

配置文件
  • 单机
    • masterSlaverMode = false,dsn 列表为一个元素。
  • 主从
    • masterSlaverMode = true,dsn 列表第一个元素为主服务地址,后续为从服务地址。
  • 集群
    • masterSlaverMode = false,dsn 列表多元素。
{
  "sql": {
    "masterSlaverMode": false,
    "driver": "",
    "dsn": [
      "username:password@tcp(ip:port)/databases" // 也可以是 sql.Open() 中的参数值
    ],
    "maxIdles": 0,
    "maxOpens": 0
  }
}
导入驱动

fns.sql 本身不带驱动,需要导入与配置文件中相同的驱动。

import _ "github.com/go-sql-driver/mysql"
服务部署
  • fns为单机模式
    • 直接部署
  • fns为分布式模式
    • 可以单独起一个(一组)只有 sql 服务的应用(推荐)。
    • 也可以与fns单机模式一样使用。
    • 支持分布式事务。
app.Deply(sql.Service())
代理使用

具体参考 proxy.go

// 在上下文中开启事务
sql.TxBegin(ctx)
// 提交上下文中的事务
sql.TxCommit(ctx)
// 查询,如果 param 中设置在事务中查询,则使用事务查询
sql.Query(ctx, param)
// 执行,如果 param 中设置在事务中查询,则使用事务查询
sql.Execute(ctx, param)

分布式事务(GlobalTransactionManagement)

使用以请求编号绑定事务,并在请求上下文中标记事务所在服务,在服务发现的精确发现功能中把同一个请求上下文(无论在哪个节点)都转发到事务所在服务。
注意事项:

  • 事务开启时需求一个 timeout,默认是10秒,当在这个时间内没有被提交或回滚,超时后会自动回滚。
  • 使用分布式事务的最佳方式是采样 proxy 中的函数,而非其它自行代理操作。
  • 部署的方式最好是以单独服务的方式(一个fns内只有 sql 服务)部署一个集群。

Documentation

Index

Constants

View Source
const (
	StringType  = ColumnType("string")
	IntType     = ColumnType("int")
	FloatType   = ColumnType("float")
	BytesType   = ColumnType("bytes")
	JsonType    = ColumnType("json")
	BoolType    = ColumnType("bool")
	TimeType    = ColumnType("time")
	UnknownType = ColumnType("unknown")
)
View Source
const (
	Namespace = "sql"

	TxBeginFn    = "tx_begin"
	TxCommitFn   = "tx_commit"
	TxRollbackFn = "tx_rollback"
	QueryFn      = "query"
	ExecuteFn    = "execute"
)

Variables

This section is empty.

Functions

func Service

func Service() fns.Service

func TxBegin

func TxBegin(ctx fns.Context, param TxBeginParam) (err errors.CodeError)

func TxCommit

func TxCommit(ctx fns.Context) (err errors.CodeError)

func TxRollback

func TxRollback(ctx fns.Context) (err errors.CodeError)

Types

type Client

type Client interface {
	Reader() (v *db.DB)
	Writer() (v *db.DB)
	Close() (err error)
}

type Cluster

type Cluster struct {
	// contains filtered or unexported fields
}

func NewCluster

func NewCluster(databases []*db.DB) (client *Cluster)

func (*Cluster) Close

func (client *Cluster) Close() (err error)

func (*Cluster) Reader

func (client *Cluster) Reader() (v *db.DB)

func (*Cluster) Writer

func (client *Cluster) Writer() (v *db.DB)

type Column

type Column struct {
	Type  ColumnType      `json:"type,omitempty"`
	Name  string          `json:"name,omitempty"`
	Value json.RawMessage `json:"value,omitempty"`
	Nil   bool            `json:"nil,omitempty"`
	// contains filtered or unexported fields
}

func (*Column) Scan

func (c *Column) Scan(src interface{}) error

type ColumnType

type ColumnType string

type Config

type Config struct {
	Driver           string   `json:"driver"`
	MasterSlaverMode bool     `json:"masterSlaverMode,omitempty"`
	DSN              []string `json:"dsn,omitempty"`
	MaxIdles         int      `json:"maxIdles,omitempty"`
	MaxOpens         int      `json:"maxOpens,omitempty"`
}

func (*Config) CreateClient

func (config *Config) CreateClient() (client Client, err error)

type ExecResult

type ExecResult struct {
	Affected     int64 `json:"affected,omitempty"`
	LastInsertId int64 `json:"lastInsertId,omitempty"`
}

func Execute

func Execute(ctx fns.Context, param Param) (result *ExecResult, err errors.CodeError)

type Executor

type Executor interface {
	ExecContext(ctx context.Context, query string, args ...interface{}) (db.Result, error)
}

type FieldColumn added in v0.3.10

type FieldColumn struct {
	Kind      string
	FieldType reflect.Type
	Column    *Column
}

type GlobalTransaction

type GlobalTransaction struct {
	// contains filtered or unexported fields
}

type GlobalTransactionManagement

type GlobalTransactionManagement struct {
	// contains filtered or unexported fields
}

func NewGlobalTransactionManagement

func NewGlobalTransactionManagement() *GlobalTransactionManagement

func (*GlobalTransactionManagement) Close

func (gtm *GlobalTransactionManagement) Close()

func (*GlobalTransactionManagement) Del

func (gtm *GlobalTransactionManagement) Del(ctx fns.Context)

func (*GlobalTransactionManagement) Get

func (gtm *GlobalTransactionManagement) Get(ctx fns.Context) (tx *sql.Tx, has bool)

func (*GlobalTransactionManagement) Set

func (gtm *GlobalTransactionManagement) Set(ctx fns.Context, tx *sql.Tx, timeout time.Duration) (err error)

type KDB

type KDB struct {
	// contains filtered or unexported fields
}

func (*KDB) Key

func (k *KDB) Key() string

type MasterSlaver

type MasterSlaver struct {
	// contains filtered or unexported fields
}

func NewMasterSlaver

func NewMasterSlaver(master *db.DB, slavers []*db.DB) (client *MasterSlaver)

func (*MasterSlaver) Close

func (client *MasterSlaver) Close() (err error)

func (*MasterSlaver) Reader

func (client *MasterSlaver) Reader() (v *db.DB)

func (*MasterSlaver) Writer

func (client *MasterSlaver) Writer() (v *db.DB)

type NullJson

type NullJson struct {
	Json  json.RawMessage
	Valid bool
}

func (*NullJson) Scan

func (v *NullJson) Scan(src interface{}) error

type NullSQLRaw

type NullSQLRaw struct {
	Raw   db.RawBytes
	Valid bool
}

func (*NullSQLRaw) Scan

func (v *NullSQLRaw) Scan(src interface{}) error

type Param

type Param struct {
	Query string `json:"query,omitempty"`
	Args  *Tuple `json:"args,omitempty"`
	InTx  bool   `json:"inTx,omitempty"`
}

type QueryAble

type QueryAble interface {
	QueryContext(ctx context.Context, query string, args ...interface{}) (*db.Rows, error)
}

type Row

type Row struct {
	Columns []*Column `json:"columns,omitempty"`
}

func (*Row) Scan

func (r *Row) Scan(target interface{}) (err error)

type Rows

type Rows struct {
	Values []*Row `json:"values,omitempty"`
}

func NewRows

func NewRows(raws *db.Rows) (r *Rows, err error)

func Query

func Query(ctx fns.Context, param Param) (rows *Rows, err errors.CodeError)

func (*Rows) Empty

func (r *Rows) Empty() (ok bool)

func (*Rows) Scan

func (r *Rows) Scan(v interface{}) (err error)

func (*Rows) Size

func (r *Rows) Size() int

type Standalone

type Standalone struct {
	// contains filtered or unexported fields
}

func NewStandalone

func NewStandalone(v *db.DB) (client *Standalone)

func (*Standalone) Close

func (client *Standalone) Close() (err error)

func (*Standalone) Reader

func (client *Standalone) Reader() (v *db.DB)

func (*Standalone) Writer

func (client *Standalone) Writer() (v *db.DB)

type Tuple

type Tuple struct {
	// contains filtered or unexported fields
}

func NewTuple

func NewTuple() *Tuple

func (*Tuple) Append

func (t *Tuple) Append(values ...interface{}) *Tuple

func (Tuple) MarshalJSON

func (t Tuple) MarshalJSON() (p []byte, err error)

func (*Tuple) UnmarshalJSON

func (t *Tuple) UnmarshalJSON(p []byte) (err error)

type TxAddress

type TxAddress struct {
	Address string `json:"address,omitempty"`
}

type TxBeginParam

type TxBeginParam struct {
	Timeout   time.Duration     `json:"timeout,omitempty"`
	Isolation db.IsolationLevel `json:"isolation,omitempty"`
}

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL