sql

package module
v0.9.6 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jan 19, 2022 License: Apache-2.0 Imports: 20 Imported by: 10

README

SQL

基于 fns.Service 实现的内部 SQL 服务,讲 sql 操作服务化,同时支持分布式事务。

安装

go get github.com/aacfactory/fns-contrib/databases/sql

使用

配置文件
  • 单机
    • masterSlaverMode = false,dsn 列表为一个元素。
  • 主从
    • masterSlaverMode = true,dsn 列表第一个元素为主服务地址,后续为从服务地址。
  • 集群
    • masterSlaverMode = false,dsn 列表多元素。
{
  "sql": {
    "masterSlaverMode": false,
    "driver": "",
    "dsn": [
      "username:password@tcp(ip:port)/databases" // 也可以是 sql.Open() 中的参数值
    ],
    "maxIdles": 0,
    "maxOpens": 0,
    "lettersCase": "" // LOWER OR UPPER, EMPTY IS DISCARD
  }
}
导入驱动

fns.sql 本身不带驱动,需要导入与配置文件中相同的驱动。

import _ "github.com/go-sql-driver/mysql"
服务部署
  • fns为单机模式
    • 直接部署
  • fns为分布式模式
    • 可以单独起一个(一组)只有 sql 服务的应用(推荐)。
    • 也可以与fns单机模式一样使用。
    • 支持分布式事务。
app.Deply(sql.Service())
// 手动标注方言,自动是以实际使用的driver进行标注。
app.RegisterDialect("postgres")
代理使用

具体参考 proxy.go

// 在上下文中开启事务
sql.BeginTransaction(ctx)
// 提交上下文中的事务
sql.CommitTransaction(ctx)
// 查询,如果 param 中设置在事务中查询,则使用事务查询
sql.Query(ctx, param)
// 执行,如果 param 中设置在事务中查询,则使用事务查询
sql.Execute(ctx, param)

分布式事务(GlobalTransactionManagement)

使用以请求编号绑定事务,并在请求上下文中标记事务所在服务,在服务发现的精确发现功能中把同一个请求上下文(无论在哪个节点)都转发到事务所在服务。
注意事项:

  • 事务开启时需求一个 timeout,默认是10秒,当在这个时间内没有被提交或回滚,超时后会自动回滚。
  • 使用分布式事务的最佳方式是采样 proxy 中的函数,而非其它自行代理操作。
  • 部署的方式最好是以单独服务的方式(一个fns内只有 sql 服务)部署一个集群。

DAO (ORM)

fns.sql 提供一个 ORM 类型的 Database Access Object,支持二级缓存。

配置

cacheKind: 二级缓存的类型,默认是local。当为reids是,后续ttl为expire的时间(Duration格式)。

{
  "sql": {
    "dao": {
      "cacheKind": "redis",
      "options":{
        "ttl": "30s"
      }
    }
  }
}
映射
type UserRow struct {
	Id         string       `col:"ID,PK"` // PK,标识为主键
	CreateBY   string       `col:"CREATE_BY,ACB"` // ACB,创建人(如果设置,则当为空是自动使用上下文中的user id)
	CreateAT   time.Time    `col:"CREATE_AT,ACT"` // ACT,创建日期(如果设置,则当为空是自动使用当前时间)
	ModifyBY   string       `col:"MODIFY_BY,AMB"` // AMB,修改人(如果设置,则当为空是自动使用上下文中的user id)
	ModifyAT   time.Time    `col:"MODIFY_AT,AMT"` // AMT,修改日期(如果设置,则当为空是自动使用当前时间)
	DeleteBY   string       `col:"DELETE_BY,ADB"` // ADB,删除人(如果设置,则当为空是自动使用上下文中的user id)
	DeleteAT   time.Time    `col:"DELETE_AT,ADT"` // ADT,删除日期(如果设置,则当为空是自动使用当前时间)
	Version    int64        `col:"VERSION,OL"` // OL,乐观锁(如果设置,会自动处理)
	Name       string       `col:"NAME"`
	Password   string       `col:"PASSWORD"`
	Gender     string       `col:"GENDER"`
	Age        int          `col:"AGE"`
	Active     bool         `col:"ACTIVE"`
	SignUpTime time.Time    `col:"SIGN_UP_TIME"`
	Profile    *UserProfile `col:"PROFILE,JSON"` // JSON,自动编码转换
	Score      float64      `col:"SCORE"`
	DOB        time.Time    `col:"DOB"`
}

func (r *UserRow) Table() (string, string, string) {
	return "FNS", "USER", "U" // schema,table name,table alias
}

// json
type UserProfile struct {
    Name string `json:"name"`
    Age  int    `json:"age"`
}

type PostRow struct {
  Id       string            `col:"ID,PK"`
  CreateBY string            `col:"CREATE_BY,ACB"`
  CreateAT time.Time         `col:"CREATE_AT,ACT"`
  ModifyBY string            `col:"MODIFY_BY,AMB"`
  ModifyAT time.Time         `col:"MODIFY_AT,AMT"`
  Version  int64             `col:"VERSION,OL"`
  Title    string            `col:"TITLE"`
  Content  string            `col:"CONTENT"`
  Author   *UserRow          `col:"AUTHOR_ID,FK"` // FK,外键(当设置后,会自动读出,如果追加SYNC(FK:SYNC),会自动触发写操作)
  Likes    int               `col:"LIKES,VC" src:"SELECT COUNT(1) FROM \"FNS\".\"POST_LIKE\" WHERE \"POST_ID\" = \"P\".\"ID\" "` // VC,虚拟列
  Comments []*PostCommentRow `col:"COMMENTS,LK:SYNC" ref:"ID,POST_ID" sort:"CREATE_AT DESC"` // LK,一对多(当设置后,会自动读出,如果追加SYNC(LK:SYNC),会自动触发写操作)
}

func (r *PostRow) Table() (string, string, string) {
    return "FNS", "POST", "P"
}


type PostCommentRow struct {
  Id       string    `col:"ID,PK"`
  CreateBY string    `col:"CREATE_BY,ACB"`
  CreateAT time.Time `col:"CREATE_AT,ACT"`
  Post     *PostRow  `col:"POST_ID,FK"`
  User     *UserRow  `col:"USER_ID,FK"`
  Content  string    `col:"CONTENT"`
}

func (r *PostCommentRow) Table() (string, string, string) {
    return "FNS", "POST_COMMENT", "PC"
}
操作

获取DAO

dao := sql.DAO(ctx)

使用DAO

type DatabaseAccessObject interface {
	// 保存(Insert or Update)
    Save(ctx fns.Context, rows ...TableRow) (affected int, err errors.CodeError)
	// 插入
    Insert(ctx fns.Context, rows ...TableRow) (affected int, err errors.CodeError)
	// 更新
    Update(ctx fns.Context, rows ...TableRow) (affected int, err errors.CodeError)
	// 删除(当row设置ADB时,实际做更新操作)
    Delete(ctx fns.Context, rows ...TableRow) (affected int, err errors.CodeError)
	// 存在
    Exist(ctx fns.Context, row TableRow) (has bool, err errors.CodeError)
	// 获取
    Get(ctx fns.Context, row TableRow) (has bool, err errors.CodeError)
	// 查询
    Query(ctx fns.Context, param *QueryParam, rows interface{}) (has bool, err errors.CodeError)
    // 计数
	Count(ctx fns.Context, param *QueryParam, row TableRow) (num int, err errors.CodeError)
    // 分页
	Page(ctx fns.Context, param *QueryParam, rows interface{}) (page Paged, err errors.CodeError)
    // 清空一级缓存
	Close()
}

Documentation

Index

Constants

View Source
const (
	StringType  = ColumnType("string")
	IntType     = ColumnType("int")
	FloatType   = ColumnType("float")
	BytesType   = ColumnType("bytes")
	JsonType    = ColumnType("json")
	BoolType    = ColumnType("bool")
	TimeType    = ColumnType("time")
	UnknownType = ColumnType("unknown")
)

Variables

This section is empty.

Functions

func BeginTransaction added in v0.8.1

func BeginTransaction(ctx fns.Context) (err errors.CodeError)

func BeginTransactionWithOption added in v0.8.1

func BeginTransactionWithOption(ctx fns.Context, param BeginTransactionParam) (err errors.CodeError)

func CommitTransaction added in v0.8.1

func CommitTransaction(ctx fns.Context) (err errors.CodeError)

func RegisterDialect added in v0.6.0

func RegisterDialect(name string)

func RollbackTransaction added in v0.8.1

func RollbackTransaction(ctx fns.Context) (err errors.CodeError)

func Service

func Service() fns.Service

Types

type BeginTransactionParam added in v0.8.1

type BeginTransactionParam struct {
	Timeout   time.Duration     `json:"timeout,omitempty"`
	Isolation db.IsolationLevel `json:"isolation,omitempty"`
}

func DefaultTransactionOption added in v0.8.1

func DefaultTransactionOption() (v BeginTransactionParam)

func TransactionOption added in v0.8.1

func TransactionOption(timeout string, isolation db.IsolationLevel) (v BeginTransactionParam)

type Client

type Client interface {
	Reader() (v *db.DB)
	Writer() (v *db.DB)
	Close() (err error)
}

type Cluster

type Cluster struct {
	// contains filtered or unexported fields
}

func NewCluster

func NewCluster(databases []*db.DB) (client *Cluster)

func (*Cluster) Close

func (client *Cluster) Close() (err error)

func (*Cluster) Reader

func (client *Cluster) Reader() (v *db.DB)

func (*Cluster) Writer

func (client *Cluster) Writer() (v *db.DB)

type Column

type Column struct {
	Type  ColumnType      `json:"type,omitempty"`
	Name  string          `json:"name,omitempty"`
	Value json.RawMessage `json:"value,omitempty"`
	Nil   bool            `json:"nil,omitempty"`
}

func (*Column) Decode added in v0.9.6

func (c *Column) Decode(v interface{}) (err error)

type ColumnScanner added in v0.9.6

type ColumnScanner struct {
	Column
	// contains filtered or unexported fields
}

func NewColumnScanner added in v0.9.6

func NewColumnScanner(ct *db.ColumnType) (scanner *ColumnScanner)

func (*ColumnScanner) Scan added in v0.9.6

func (c *ColumnScanner) Scan(src interface{}) error

type ColumnType

type ColumnType string

type Config

type Config struct {
	Driver           string    `json:"driver"`
	MasterSlaverMode bool      `json:"masterSlaverMode,omitempty"`
	DSN              []string  `json:"dsn,omitempty"`
	MaxIdles         int       `json:"maxIdles,omitempty"`
	MaxOpens         int       `json:"maxOpens,omitempty"`
	EnableDebugLog   bool      `json:"enableDebugLog"`
	LettersCase      string    `json:"lettersCase,omitempty"`
	DAO              DAOConfig `json:"dao,omitempty"`
}

func (*Config) CreateClient

func (config *Config) CreateClient() (client Client, err error)

type DAOConfig added in v0.6.0

type DAOConfig struct {
	CacheKind string          `json:"cacheKind,omitempty"`
	Raw       json.RawMessage `json:"options,omitempty"`
}

type DaoCache added in v0.6.0

type DaoCache interface {
	GetAndFill(row TableRow) (has bool, synced bool)
	Set(row TableRow, synced bool)
	Remove(row TableRow)
	Clean()
}

type DatabaseAccessObject added in v0.6.0

type DatabaseAccessObject interface {
	Save(ctx fns.Context, rows ...TableRow) (affected int, err errors.CodeError)
	Insert(ctx fns.Context, rows ...TableRow) (affected int, err errors.CodeError)
	Update(ctx fns.Context, rows ...TableRow) (affected int, err errors.CodeError)
	Delete(ctx fns.Context, rows ...TableRow) (affected int, err errors.CodeError)
	Exist(ctx fns.Context, row TableRow) (has bool, err errors.CodeError)
	Get(ctx fns.Context, row TableRow) (has bool, err errors.CodeError)
	Query(ctx fns.Context, param *QueryParam, rows interface{}) (has bool, err errors.CodeError)
	Count(ctx fns.Context, param *QueryParam, row TableRow) (num int, err errors.CodeError)
	Page(ctx fns.Context, param *QueryParam, rows interface{}) (page Paged, err errors.CodeError)
	Close()
}

func DAO added in v0.6.0

func DAO(ctx fns.Context) (v DatabaseAccessObject)

type ExecResult

type ExecResult struct {
	Affected     int64 `json:"affected,omitempty"`
	LastInsertId int64 `json:"lastInsertId,omitempty"`
}

func Execute

func Execute(ctx fns.Context, param Param) (result *ExecResult, err errors.CodeError)

type Executor

type Executor interface {
	ExecContext(ctx context.Context, query string, args ...interface{}) (db.Result, error)
}

type FieldColumn added in v0.3.10

type FieldColumn struct {
	Kind      string
	FieldType reflect.Type
	Column    *Column
}

type GlobalTransaction

type GlobalTransaction struct {
	// contains filtered or unexported fields
}

type GlobalTransactionManagement

type GlobalTransactionManagement struct {
	// contains filtered or unexported fields
}

func NewGlobalTransactionManagement

func NewGlobalTransactionManagement() *GlobalTransactionManagement

func (*GlobalTransactionManagement) Begin added in v0.4.2

func (gtm *GlobalTransactionManagement) Begin(ctx fns.Context, db0 *db.DB, isolation db.IsolationLevel, timeout time.Duration) (err error)

func (*GlobalTransactionManagement) Close

func (gtm *GlobalTransactionManagement) Close()

func (*GlobalTransactionManagement) Commit added in v0.4.2

func (gtm *GlobalTransactionManagement) Commit(ctx fns.Context) (err error)

func (*GlobalTransactionManagement) Get

func (gtm *GlobalTransactionManagement) Get(ctx fns.Context) (tx *db.Tx, has bool)

func (*GlobalTransactionManagement) Rollback added in v0.4.2

func (gtm *GlobalTransactionManagement) Rollback(ctx fns.Context)

type KDB

type KDB struct {
	// contains filtered or unexported fields
}

func (*KDB) Key

func (k *KDB) Key() string

type MasterSlaver

type MasterSlaver struct {
	// contains filtered or unexported fields
}

func NewMasterSlaver

func NewMasterSlaver(master *db.DB, slavers []*db.DB) (client *MasterSlaver)

func (*MasterSlaver) Close

func (client *MasterSlaver) Close() (err error)

func (*MasterSlaver) Reader

func (client *MasterSlaver) Reader() (v *db.DB)

func (*MasterSlaver) Writer

func (client *MasterSlaver) Writer() (v *db.DB)

type NullJson

type NullJson struct {
	Json  json.RawMessage
	Valid bool
}

func (*NullJson) Scan

func (v *NullJson) Scan(src interface{}) error

type NullSQLRaw

type NullSQLRaw struct {
	Raw   db.RawBytes
	Valid bool
}

func (*NullSQLRaw) Scan

func (v *NullSQLRaw) Scan(src interface{}) error

type Paged added in v0.6.0

type Paged struct {
	No    int
	Num   int
	Total int
}

type Param

type Param struct {
	Query string `json:"query,omitempty"`
	Args  *Tuple `json:"args,omitempty"`
}

type QueryAble

type QueryAble interface {
	QueryContext(ctx context.Context, query string, args ...interface{}) (*db.Rows, error)
}

type QueryCondition added in v0.6.0

type QueryCondition struct {
	// contains filtered or unexported fields
}

type QueryConditions added in v0.6.0

type QueryConditions struct {
	// contains filtered or unexported fields
}

func NewQueryConditions added in v0.6.0

func NewQueryConditions() *QueryConditions

func (*QueryConditions) Between added in v0.6.0

func (c *QueryConditions) Between(column string, beg interface{}, end interface{}) *QueryConditions

func (*QueryConditions) Eq added in v0.6.0

func (c *QueryConditions) Eq(column string, value interface{}) *QueryConditions

func (*QueryConditions) GT added in v0.6.0

func (c *QueryConditions) GT(column string, value interface{}) *QueryConditions

func (*QueryConditions) GTE added in v0.6.0

func (c *QueryConditions) GTE(column string, value interface{}) *QueryConditions

func (*QueryConditions) In added in v0.6.0

func (c *QueryConditions) In(column string, values ...interface{}) *QueryConditions

func (*QueryConditions) LT added in v0.6.0

func (c *QueryConditions) LT(column string, value interface{}) *QueryConditions

func (*QueryConditions) LTE added in v0.6.0

func (c *QueryConditions) LTE(column string, value interface{}) *QueryConditions

func (*QueryConditions) Like added in v0.6.0

func (c *QueryConditions) Like(column string, value interface{}) *QueryConditions

func (*QueryConditions) NotEq added in v0.6.0

func (c *QueryConditions) NotEq(column string, value interface{}) *QueryConditions

type QueryParam added in v0.6.0

type QueryParam struct {
	// contains filtered or unexported fields
}

func NewQueryParam added in v0.6.0

func NewQueryParam() *QueryParam

func (*QueryParam) ASC added in v0.6.0

func (p *QueryParam) ASC(column string) *QueryParam

func (*QueryParam) Conditions added in v0.6.0

func (p *QueryParam) Conditions() *QueryConditions

func (*QueryParam) DESC added in v0.6.0

func (p *QueryParam) DESC(column string) *QueryParam

func (*QueryParam) Page added in v0.6.0

func (p *QueryParam) Page(no int, size int) *QueryParam

func (*QueryParam) Range added in v0.6.0

func (p *QueryParam) Range(offset int, length int) *QueryParam

type QuerySort added in v0.6.0

type QuerySort struct {
	// contains filtered or unexported fields
}

type Row

type Row struct {
	// contains filtered or unexported fields
}

func (*Row) Column added in v0.9.6

func (r *Row) Column(name string, value interface{}) (has bool, err error)

func (*Row) Columns

func (r *Row) Columns() (columns []*Column)

func (*Row) Empty added in v0.9.6

func (r *Row) Empty() (ok bool)

func (*Row) MarshalJSON added in v0.9.6

func (r *Row) MarshalJSON() (p []byte, err error)

func (*Row) Scan

func (r *Row) Scan(target interface{}) (err error)

func (*Row) UnmarshalJSON added in v0.9.6

func (r *Row) UnmarshalJSON(p []byte) (err error)

type Rows

type Rows struct {
	// contains filtered or unexported fields
}

func NewRows

func NewRows(raws *db.Rows) (r *Rows, err error)

func Query

func Query(ctx fns.Context, param Param) (rows *Rows, err errors.CodeError)

func (*Rows) Empty

func (r *Rows) Empty() (ok bool)

func (*Rows) MarshalJSON added in v0.9.6

func (r *Rows) MarshalJSON() (p []byte, err error)

func (*Rows) Next added in v0.9.6

func (r *Rows) Next() (v *Row, has bool)

func (*Rows) Scan

func (r *Rows) Scan(v interface{}) (err error)

func (*Rows) Size

func (r *Rows) Size() int

func (*Rows) UnmarshalJSON added in v0.9.6

func (r *Rows) UnmarshalJSON(p []byte) (err error)

type Standalone

type Standalone struct {
	// contains filtered or unexported fields
}

func NewStandalone

func NewStandalone(v *db.DB) (client *Standalone)

func (*Standalone) Close

func (client *Standalone) Close() (err error)

func (*Standalone) Reader

func (client *Standalone) Reader() (v *db.DB)

func (*Standalone) Writer

func (client *Standalone) Writer() (v *db.DB)

type TableRow added in v0.6.0

type TableRow interface {
	Table() (namespace string, name string, alias string)
}

type Tuple

type Tuple struct {
	// contains filtered or unexported fields
}

func NewTuple

func NewTuple() *Tuple

func (*Tuple) Append

func (t *Tuple) Append(values ...interface{}) *Tuple

func (Tuple) MarshalJSON

func (t Tuple) MarshalJSON() (p []byte, err error)

func (*Tuple) Merge added in v0.9.6

func (t *Tuple) Merge(v *Tuple) *Tuple

func (*Tuple) Size added in v0.9.6

func (t *Tuple) Size() (n int)

func (*Tuple) UnmarshalJSON

func (t *Tuple) UnmarshalJSON(p []byte) (err error)

type TxAddress

type TxAddress struct {
	Address string `json:"address,omitempty"`
}

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL