sqlkit

package module
v0.0.0-...-5e2cfb7 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Oct 10, 2023 License: Apache-2.0 Imports: 35 Imported by: 0

README

sqlkit

sql kits

TODO

  • mysql knowledge engineering and
  • mysql expert experience(e.g. performance_schema)
  • sql mock recorder
  • sql rewrite
  • sql flow control
  • sql metrics & automatic profiling
  • sql distributed lock
  • distributed transaction

Documentation

Overview

Type conversions for Scan.

The three clause BSD license (http://en.wikipedia.org/wiki/BSD_licenses)

Copyright (c) 2013-2019, DATA-DOG team All rights reserved.

Redistribution and use in source and binary forms, with or without modification, are permitted provided that the following conditions are met:

  • Redistributions of source code must retain the above copyright notice, this list of conditions and the following disclaimer.
  • Redistributions in binary form must reproduce the above copyright notice, this list of conditions and the following disclaimer in the documentation and/or other materials provided with the distribution.
  • The name DataDog.lt may not be used to endorse or promote products derived from this software without specific prior written permission.

THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL MICHAEL BOSTOCK BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.

Index

Constants

View Source
const (
	// MysqlAuditDriverName msyql+audit database driver name
	MysqlAuditDriverName = "audit:mysql"
)

Variables

View Source
var (
	// App app name
	App = ""

	// DefaultAlarmThreshold default alarm threshold(scan rows)
	DefaultAlarmThreshold int64 = 500

	// DefaultBannedThreshold default banned threshold(scan rows)
	DefaultBannedThreshold int64 = 100000

	// Now is time.Now
	Now = time.Now // used for test
)
View Source
var (
	// ErrBanned sql banned error, use errors.Is(err, ErrBanned) to assert
	ErrBanned = errors.WithError(errors.New("sql is banned"), errors.InvalidArgument)

	// ErrAlarm sql warning, use errors.Is(err, ErrAlarm) to assert
	ErrAlarm = errors.New("sql is alarmed")
)
View Source
var CSVColumnParser = func(s string) []byte {
	switch {
	case strings.ToLower(s) == "null":
		return nil
	}
	return []byte(s)
}

CSVColumnParser is a function which converts trimmed csv column string to a []byte representation. Currently transforms NULL to nil

View Source
var DefaultSeenSqlLogLevel = int(Alarm)

DefaultSeenSqlLogLevel default log level for seen sql

Functions

func ConvertAssign

func ConvertAssign(dest, src any) error

ConvertAssign is the same as convertAssignRows, but without the optional rows argument.

func DefaultShouldAudit

func DefaultShouldAudit(query string) bool

DefaultShouldAudit sql是否审计的默认实现

func MarshalMetric

func MarshalMetric(name string) string

func Wrap

func Wrap(driver driver.Driver, wrapper Middleware) driver.Driver

Wrap is used to create a new instrumented driver, it takes a vendor specific driver, and a Hooks instance to produce a new driver instance. It's usually used inside a sql.Register() statement

Types

type API

type API struct {
	PathPrefix string `json:"path_prefix,omitempty"`
	DB         *sql.DB
}

func (API) Locks

func (api API) Locks(w http.ResponseWriter, r *http.Request)

func (API) Stats

func (api API) Stats(w http.ResponseWriter, r *http.Request)

type AlarmType

type AlarmType int

AlarmType alarm type

const (
	// Normal normal, means not alarm
	Normal AlarmType = iota

	// Alarm warning, means index missing but the number of scan lines is not large, still let the sql go through
	Alarm

	// Banned banned, means index missing and the number of scan lines is not large, still let the sql will be banned
	Banned
)

func (AlarmType) MarshalJSON

func (at AlarmType) MarshalJSON() ([]byte, error)

func (AlarmType) String

func (at AlarmType) String() string

func (*AlarmType) UnmarshalJSON

func (at *AlarmType) UnmarshalJSON(data []byte) error

type ArgsRewriter

type ArgsRewriter interface {
	RewriterBase
	RewriteArgs(args []any) ([]any, error)
}

type Audit

type Audit struct {
	// DatabaseName database name
	DatabaseName string `json:"database_name"`

	// AlarmThreshold sql will be alarmed if scan rows great than this threshold, default to 500
	AlarmThreshold *int64 `json:"alarm_threshold"`

	// BannedThreshold sql will be banned if scan rows great than this threshold, default to 10w
	BannedThreshold int64 `json:"banned_threshold"`

	// SeenSqlLogLevel log level for seen sqls(not new found), if SeenSqlLogLevel <= AlarmType will be logged
	SeenSqlLogLevel *atomic.Int32 `json:"seen_sql_log_level,omitempty"`

	// Whitelist contains sqls which will not be audited
	Whitelist []string `json:"whitelist,omitempty"`

	// SqlCacheDuration sql explain result cache duration, default is forever
	SqlCacheDuration *utils.Duration `json:"sql_cache_duration,omitempty"`

	// ExplainExtraAlarmSubstrs alarm when explain extra contains the sub-string in this list
	ExplainExtraAlarmSubstrs []string `json:"explain_extra_alarm_substrs,omitempty"`

	// ShouldAuditFunc used to determine if a sql should be audited, default behavior is detect if startss with `select|insert|update|delete`
	// NOTE: it does contains the whitelist
	ShouldAuditFunc func(query string) bool `json:"-"`

	// ContextLogFields extract zap fileds list for logging, e.g. traceid...
	ContextLogFields func(context.Context) []zap.Field `json:"-"`
	// contains filtered or unexported fields
}

Audit audit sqls and alarm or ban according to some conditions

Usage:

import (
    sql "database/sql"

    "github.com/qustavo/sqlhooks/v2"
    "github.com/go-sql-driver/mysql"

    "gitlab.alibaba-inc.com/t3/pkg/sqlkit"
)

audit := &sqlkit.Audit{
    DatabaseName: "xxx",
}
err := audit.Provision(ctx)
sql.Register(sql.DriverName, sqlhooks.Wrap(&mysql.MySQLDriver{}, audit))
db, err := sql.Open(sqlkit.DriverName, ...)
err = audit.SetDB(db) // NOTE: reuse the same pool
err = audit.Validate()
// if err == nil, then you can use the db ...

func (*Audit) AddBlacklistQuery

func (audit *Audit) AddBlacklistQuery(query string, alarmType AlarmType, reason string)

AddBlacklistQuery 用于动态设定黑名单查询, 用于止血 注意:未持久化

func (*Audit) AddWhitelistQuery

func (audit *Audit) AddWhitelistQuery(query string)

SetWhitelistQuery 用于动态设定白名单查询, 如出现误判场景 NOTE: no persistence!

func (*Audit) After

func (audit *Audit) After(ctx context.Context, query string, args ...interface{}) (context.Context, error)

func (*Audit) Before

func (audit *Audit) Before(ctx context.Context, query string, args ...interface{}) (context.Context, error)

func (*Audit) BlacklistAPI

func (audit *Audit) BlacklistAPI(w http.ResponseWriter, r *http.Request)

BlacklistQueryAPI

func (*Audit) ClearSqls

func (audit *Audit) ClearSqls() error

ClearSqls clear cached sqls

func (*Audit) ConfigAPI

func (audit *Audit) ConfigAPI(w http.ResponseWriter, r *http.Request)

ConfigAPI list config

func (*Audit) DelWhitelistQuery

func (audit *Audit) DelWhitelistQuery(query string)

func (*Audit) DeleteSql

func (audit *Audit) DeleteSql(query string) error

DeteleSql delete specified sql in cache

func (*Audit) DetectAlarmType

func (audit *Audit) DetectAlarmType(ers []mysql.ExplainRow) (alarmType AlarmType, reason string)

DetectAlarmType 根据Explain结果判断AlarmType

func (*Audit) ExecContext

func (audit *Audit) ExecContext(next ExecContext) ExecContext

func (*Audit) Explain

func (audit *Audit) Explain(ctx context.Context, query string, args ...interface{}) ([]mysql.ExplainRow, error)

Explain do mysql explain

func (*Audit) GetSql

func (audit *Audit) GetSql(query string) *Sql

GetSql get sql

func (*Audit) MetricsAPI

func (audit *Audit) MetricsAPI(w http.ResponseWriter, r *http.Request)

MetricsAPI list metrics

func (*Audit) Provision

func (audit *Audit) Provision(ctx context.Context) error

func (*Audit) QueryContext

func (audit *Audit) QueryContext(next QueryContext) QueryContext

func (*Audit) SetDB

func (audit *Audit) SetDB(db *sql.DB) error

func (*Audit) SetLogger

func (audit *Audit) SetLogger(logger *zap.Logger) error

func (*Audit) SetSeenSqlLogLevel

func (audit *Audit) SetSeenSqlLogLevel(level int32)

SetSeenSqlLogLevel 用于动态调整日志级别, 如用于trace分析问题

func (*Audit) SetSeenSqlLogLevelAPI

func (audit *Audit) SetSeenSqlLogLevelAPI(w http.ResponseWriter, r *http.Request)

SetSeenSqlLogLevelAPI set seen_sql_log_level

func (*Audit) SetSql

func (audit *Audit) SetSql(s *Sql) error

SetSql set sql used to set blacklist(note: no persistence)

func (*Audit) ShouldAudit

func (audit *Audit) ShouldAudit(query string) bool

func (*Audit) Sqls

func (audit *Audit) Sqls() map[string]*Sql

Sqls return all sqls cached for representation

func (*Audit) SqlsAPI

func (audit *Audit) SqlsAPI(w http.ResponseWriter, r *http.Request)

SqlsAPI list sqls

func (*Audit) Tables

func (audit *Audit) Tables(ctx context.Context) (map[string]*mysql.Table, error)

Tables return all tables cached for representation

func (*Audit) TablesAPI

func (audit *Audit) TablesAPI(w http.ResponseWriter, r *http.Request)

TablesAPI list tables

func (*Audit) Validate

func (audit *Audit) Validate() error

func (*Audit) WhitelistAPI

func (audit *Audit) WhitelistAPI(w http.ResponseWriter, r *http.Request)

WhitelistQueryAPI

func (*Audit) Whitelists

func (audit *Audit) Whitelists() []string

Whitelists 返回所有白名单查询, 包括静态配置和动态添加

type BlacklistRequest

type BlacklistRequest struct {
	Query     string    `json:"query"`
	AlarmType AlarmType `json:"alarm_type"`
	Reason    string    `json:"reason"`
}

func (BlacklistRequest) String

func (br BlacklistRequest) String() string

type Conn

type Conn struct {
	Conn driver.Conn
	// contains filtered or unexported fields
}

Conn implements a database/sql.driver.Conn

func (*Conn) Begin

func (conn *Conn) Begin() (driver.Tx, error)

func (*Conn) BeginTx

func (conn *Conn) BeginTx(ctx context.Context, opts driver.TxOptions) (driver.Tx, error)

func (*Conn) Close

func (conn *Conn) Close() error

func (*Conn) Prepare

func (conn *Conn) Prepare(query string) (driver.Stmt, error)

func (*Conn) PrepareContext

func (conn *Conn) PrepareContext(ctx context.Context, query string) (driver.Stmt, error)

type DecimalDecompose

type DecimalDecompose = decimalDecompose // used for tests

type Driver

type Driver struct {
	driver.Driver
	// contains filtered or unexported fields
}

Driver implements a database/sql/driver.Driver

func (*Driver) Open

func (drv *Driver) Open(name string) (driver.Conn, error)

Open opens a connection

type EmptyRows

type EmptyRows struct{}

func (*EmptyRows) Close

func (rs *EmptyRows) Close() error

func (*EmptyRows) Columns

func (rs *EmptyRows) Columns() []string

func (*EmptyRows) Next

func (rs *EmptyRows) Next(dest []driver.Value) error

type ExecContext

type ExecContext func(ctx context.Context, query string, args []driver.NamedValue) (driver.Result, error)

type ExecerContext

type ExecerContext struct {
	*Conn
}

ExecerContext implements a database/sql.driver.ExecerContext

func (*ExecerContext) Exec

func (conn *ExecerContext) Exec(query string, args []driver.Value) (driver.Result, error)

func (*ExecerContext) ExecContext

func (conn *ExecerContext) ExecContext(ctx context.Context, query string, args []driver.NamedValue) (driver.Result, error)

type ExecerQueryerContext

type ExecerQueryerContext struct {
	*Conn
	*ExecerContext
	*QueryerContext
}

ExecerQueryerContext implements database/sql.driver.ExecerContext and database/sql.driver.QueryerContext

type ExecerQueryerContextWithSessionResetter

type ExecerQueryerContextWithSessionResetter struct {
	*Conn
	*ExecerContext
	*QueryerContext
	*SessionResetter
}

ExecerQueryerContext implements database/sql.driver.ExecerContext and database/sql.driver.QueryerContext

type LogHooks

type LogHooks struct {
	Logger       *zap.Logger
	Level        zapcore.Level     `json:"level,omitempty"`
	FieldSize    int               `json:"field_size,omitempty"`
	FieldNameMap map[string]string `json:"field_name_map,omitempty"`
}

LogHooks log sqls with rt and traceid

func (*LogHooks) After

func (h *LogHooks) After(ctx context.Context, query string, args ...interface{}) (context.Context, error)

After hook will get the timestamp registered on the Before hook and print the elapsed time

func (*LogHooks) Before

func (h *LogHooks) Before(ctx context.Context, query string, args ...interface{}) (context.Context, error)

Before hook will print the query with it's args and return the context with the timestamp TODO: before log + after log?

func (*LogHooks) OnError

func (h *LogHooks) OnError(ctx context.Context, err error, query string, args ...interface{}) error

type Middleware

type Middleware interface {
	ExecContext(ExecContext) ExecContext
	QueryContext(QueryContext) QueryContext
}

Middleware is a middleware which wrap a driver to another This work is based on `sqlhooks`, why define a new `Hook`? For `sqlhooks.Hooks` can not operate the arguments & returns but only ctx, which is not applicable for some scenarios, e.g. mock, cache, tranform, ... Experimental!!!

type Mock

type Mock struct {
	Name         string
	Playback     bool
	ExecReturns  *SyncMap[string, *Return[driver.Result]]
	QueryReturns *SyncMap[string, *Return[driver.Rows]]
}

func NewMock

func NewMock(opts ...MockOption) *Mock

func (*Mock) AddExec

func (m *Mock) AddExec(query string, ret *Return[driver.Result])

func (*Mock) AddQuery

func (m *Mock) AddQuery(query string, ret *Return[driver.Rows])

func (*Mock) Dump

func (m *Mock) Dump() error

Dump cache data to fixture

func (*Mock) ExecContext

func (m *Mock) ExecContext(next ExecContext) ExecContext

func (*Mock) Load

func (m *Mock) Load() error

Load load data from fixture

func (*Mock) QueryContext

func (m *Mock) QueryContext(next QueryContext) QueryContext

type MockOption

type MockOption func(*Mock)

func WithMockExecReturns

func WithMockExecReturns(m map[string]*Return[driver.Result]) MockOption

func WithMockName

func WithMockName(name string) MockOption

func WithMockPlayback

func WithMockPlayback(pb bool) MockOption

func WithMockQueryReturns

func WithMockQueryReturns(m map[string]*Return[driver.Rows]) MockOption

type QueryContext

type QueryContext func(ctx context.Context, query string, args []driver.NamedValue) (driver.Rows, error)

type QueryerContext

type QueryerContext struct {
	*Conn
}

QueryerContext implements a database/sql.driver.QueryerContext

func (*QueryerContext) QueryContext

func (conn *QueryerContext) QueryContext(ctx context.Context, query string, args []driver.NamedValue) (driver.Rows, error)

type Return

type Return[T any] struct {
	Value T
	Err   error
}

func NewReturn

func NewReturn[T any](value T, err error) *Return[T]

type Rewrite

type Rewrite struct {
	GlobalRewriter  *Rewriter            `json:"global_rewriter,omitempty"`
	CustomRewriters map[string]*Rewriter `json:"custom_rewriters,omitempty"`
}

Rewrite an aggregate rewriter, usually can be used in most cases

func (*Rewrite) Name

func (r *Rewrite) Name() string

func (*Rewrite) Provision

func (r *Rewrite) Provision(ctx context.Context) error

func (Rewrite) Rewrite

func (r Rewrite) Rewrite(sql string, args []any) (string, []any, error)

func (*Rewrite) SetLogger

func (r *Rewrite) SetLogger(logger *zap.Logger)

type Rewriter

type Rewriter struct {
	SqlRewriters  []SqlRewriter  `json:"sql_rewriters,omitempty"`
	ArgsRewriters []ArgsRewriter `json:"args_rewriters,omitempty"`
}

func (Rewriter) Name

func (rr Rewriter) Name() string

func (Rewriter) Provision

func (rr Rewriter) Provision(ctx context.Context) error

func (Rewriter) Rewrite

func (rr Rewriter) Rewrite(sql string, args []any) (string, []any, error)

func (*Rewriter) SetLogger

func (rr *Rewriter) SetLogger(logger *zap.Logger)

type RewriterBase

type RewriterBase interface {
	Name() string
	Provision(context.Context) error
	SetLogger(*zap.Logger)
}

type RewriterInterface

type RewriterInterface interface {
	RewriterBase
	Rewrite(sql string, args []any) (string, []any, error)
}

type Rows

type Rows struct {
	Converter driver.ValueConverter
	Cols      []string
	Rows      [][]driver.Value
	CloseErr  error
	Pos       int
	NextErr   map[int]error
}

Rows mainly copy from `sqlmock`, and used with `sqlkit.Mock` Experimental!!!

func NewRows

func NewRows(columns []string) *Rows

func (*Rows) AddRow

func (r *Rows) AddRow(values ...driver.Value) *Rows

AddRow composed from database driver.Value slice return the same instance to perform subsequent actions. Note that the number of values must match the number of columns

func (*Rows) Close

func (r *Rows) Close() error

func (*Rows) CloseError

func (r *Rows) CloseError(err error) *Rows

CloseError allows to set an error which will be returned by rows.Close function.

The close error will be triggered only in cases when rows.Next() EOF was not yet reached, that is a default sql library behavior

func (*Rows) Columns

func (r *Rows) Columns() []string

func (*Rows) FromCSVString

func (r *Rows) FromCSVString(s string) *Rows

FromCSVString build rows from csv string. return the same instance to perform subsequent actions. Note that the number of values must match the number of columns

func (*Rows) Next

func (r *Rows) Next(dest []driver.Value) error

advances to next row

func (*Rows) RowError

func (r *Rows) RowError(row int, err error) *Rows

RowError allows to set an error which will be returned when a given row number is read

type SessionResetter

type SessionResetter struct {
	*Conn
}

func (*SessionResetter) ResetSession

func (s *SessionResetter) ResetSession(ctx context.Context) error

type ShadowTable

type ShadowTable struct {
	Prefix string `json:"prefix,omitempty"`
	Suffix string `json:"suffix,omitempty"`
	// contains filtered or unexported fields
}

func (*ShadowTable) Enter

func (st *ShadowTable) Enter(in ast.Node) (ast.Node, bool)

func (*ShadowTable) Leave

func (st *ShadowTable) Leave(in ast.Node) (ast.Node, bool)

func (*ShadowTable) Name

func (st *ShadowTable) Name() string

func (*ShadowTable) Provision

func (st *ShadowTable) Provision(ctx context.Context) error

func (*ShadowTable) RewriteSql

func (st *ShadowTable) RewriteSql(sql string) (string, error)

func (*ShadowTable) SetLogger

func (st *ShadowTable) SetLogger(logger *zap.Logger)

func (*ShadowTable) Sqls

func (st *ShadowTable) Sqls() map[string]string

type Sql

type Sql struct {
	Query     string             `json:"query"`
	Args      []interface{}      `json:"args"`
	Explain   []mysql.ExplainRow `json:"explain"`
	AlarmType AlarmType          `json:"alarm_type"`
	Reason    string             `json:"reason"`
	CreatedAt time.Time          `json:"created_at"`
}

Sql sql statement

type SqlRewriter

type SqlRewriter interface {
	RewriterBase
	RewriteSql(sql string) (string, error)
}

type Stmt

type Stmt struct {
	Stmt driver.Stmt
	// contains filtered or unexported fields
}

Stmt implements a database/sql/driver.Stmt

func (*Stmt) Close

func (stmt *Stmt) Close() error

func (*Stmt) Exec

func (stmt *Stmt) Exec(args []driver.Value) (driver.Result, error)

func (*Stmt) ExecContext

func (stmt *Stmt) ExecContext(ctx context.Context, args []driver.NamedValue) (driver.Result, error)

func (*Stmt) NumInput

func (stmt *Stmt) NumInput() int

func (*Stmt) Query

func (stmt *Stmt) Query(args []driver.Value) (driver.Rows, error)

func (*Stmt) QueryContext

func (stmt *Stmt) QueryContext(ctx context.Context, args []driver.NamedValue) (driver.Rows, error)

type SyncMap

type SyncMap[K comparable, V any] struct {
	// contains filtered or unexported fields
}

func NewSyncMap

func NewSyncMap[K comparable, V any]() *SyncMap[K, V]

NewSyncMap creates a new map

func (*SyncMap[K, V]) Delete

func (m *SyncMap[K, V]) Delete(key K)

Set set a V's instance with key, if exists then override

func (*SyncMap[K, V]) Load

func (m *SyncMap[K, V]) Load(key K) (value V, ok bool)

Set set a V's instance with key, if exists then override

func (*SyncMap[K, V]) LoadAndDelete

func (m *SyncMap[K, V]) LoadAndDelete(key K) (value any, loaded bool)

func (*SyncMap[K, V]) LoadOrStore

func (m *SyncMap[K, V]) LoadOrStore(key K, value V) (actual any, loaded bool)

func (*SyncMap[K, V]) Range

func (m *SyncMap[K, V]) Range(f func(key, value any) bool)

func (*SyncMap[K, V]) Store

func (m *SyncMap[K, V]) Store(key K, value V)

Set set a V's instance with key, if exists then override

Directories

Path Synopsis

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL