ckgroup

package
v1.1.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Mar 21, 2022 License: MIT Imports: 21 Imported by: 0

README

CkGroup

clickhouse-go 的封装

目的是操作 ClickHouse 集群内的多个节点

特性

  • 插入时在代码层面做 hash , 防止 ClickHouse 的分布式表负担过大
  • 查询方便 , 支持普通查询和流式查询
  • 查询结果可以映射为 struct 或 map
  • 插入失败时有重试机制

例子

具体参考 examples 目录下的代码

insert
imports....

func main() {
	var c = config.Config{
		ShardGroups: []config.ShardGroupConfig{
			{ShardNode: "tcp://localhost:9000", ReplicaNodes: []string{"tcp://localhost:9001"}},
			{ShardNode: "tcp://localhost:9002", ReplicaNodes: []string{"tcp://localhost:9003"}},
		}}
	
	group := ckgroup.MustCKGroup(c)

    users := generateUsers()
    err := group.InsertAuto(`insert into user (id,real_name,city) values (#{id},#{real_name},#{city})`, `id`, users)
	if err != nil {
		fmt.Println(err)
	}
}

query
improt....

type user struct {
	Id       int `db:"id"`
	RealName string `db:"real_name"`
	City     string `db:"city"`
}

func main() {
	var c = config.Config{QueryNode: "clickhouse dns url"}
	group := ckgroup.MustCKGroup(c)

	datas := &[]*user{}
	err := group.GetQueryNode().QueryRows(datas, `select id, real_name, city from user where  city = ?`, "上海")
	if err != nil {
		fmt.Println(err)
		return
	}
	for i := range *datas {
		fmt.Println((*datas)[i])
	}
}

Documentation

Index

Constants

View Source
const (
	DRIVER = "clickhouse"
	DbTag  = "db"
)

Variables

This section is empty.

Functions

func BatchScanRows deprecated

func BatchScanRows(db *sql.DB, ch chan interface{}, dest interface{}, query string, args ...interface{}) error

Deprecated: Use CKConn.QueryStream instead.

func DataChanInsert

func DataChanInsert(db *sql.DB, dataChan chan interface{}, insertSql string, length int) error

Deprecated

Types

type AlterErrDetail

type AlterErrDetail struct {
	Err error
	// 发生错误的shardconn对象
	Conn       ShardConn
	ShardIndex int
}

type CKConn

type CKConn interface {
	GetHost() string
	GetUser() string
	GetRawConn() *sql.DB
	Exec(query string, args ...interface{}) error
	QueryRowNoType(query string, args ...interface{}) (map[string]interface{}, error)
	QueryRowsNoType(query string, args ...interface{}) ([]map[string]interface{}, error)
	QueryRow(v interface{}, query string, args ...interface{}) error
	QueryRows(v interface{}, query string, args ...interface{}) error
	// QueryStream 流式查询 , 利用 chan 来存储查询的数据
	// chanData 类型只能是 chan *sturct 或 chan sturct
	QueryStream(chanData interface{}, query string, args ...interface{}) error
	// Insert
	// query  形如 insert into user (id,real_name,city) values (#{id},#{real_name},#{city}) , #{}内的字符只能是大小写字母,数字和下划线
	// sliceData  要输入的数组 , 类型只能是 []*sturct 或 []struct
	Insert(query string, sliceData interface{}) error
}

func MustCKConn

func MustCKConn(dns string) CKConn

func NewCKConn

func NewCKConn(dns string) (CKConn, error)

type DBGroup

type DBGroup interface {
	GetQueryNode() CKConn
	GetAllNodes() []CKConn
	GetAllShard() []ShardConn

	KeepAlive(intervalSecond int)

	// InsertAuto 自动把数组内的数据根据 siphash 分片插入到各个 clickhouse 节点
	// query  形如 insert into user (id,real_name,city) values (#{id},#{real_name},#{city}) . #{}内的字符只能是大小写字母,数字和下划线
	// hashTag  struct sipHash字段 `db` tag 的值
	// sliceData  要输入的数组 , 类型只能是 []*struct 或 []struct
	InsertAuto(query, hashTag string, sliceData interface{}) error

	// InsertAutoDetail 第一个返回值是详细的错误,第二返回值是参数校验的错误
	InsertAutoDetail(query, hashTag string, sliceData interface{}) ([]InsertErrDetail, error)

	// ExecSerialAll 串行的在所有节点上执行create,drop,kill,detach 等语句
	// onErrContinue 遇到错误后是否继续
	// 第一个返回值是详细的错误,第二返回值是sql校验的错误
	ExecSerialAll(onErrContinue bool, query string, args ...interface{}) ([]ExecErrDetail, error)

	// ExecParallelAll 并行的在所有节点执行create,drop,kill,detach 等语句
	// 第一个返回值是详细的错误,第二返回值是sql校验的错误
	ExecParallelAll(query string, args ...interface{}) ([]ExecErrDetail, error)

	// AlterAuto 在每个shard的一个节点上执行alter语句
	// 第一个返回值是详细的错误,第二返回值是sql校验的错误
	AlterAuto(query string, args ...interface{}) ([]AlterErrDetail, error)

	// Deprecated
	ExecAuto(query string, hashIdx int, args [][]interface{}) error
	// Deprecated
	ExecAll(query string, args [][]interface{}) error
	Close()
}

func MustCKGroup

func MustCKGroup(c config.Config, opts ...OptionFunc) DBGroup

func NewCKGroup

func NewCKGroup(c config.Config, opts ...OptionFunc) (DBGroup, error)

type ExecErrDetail

type ExecErrDetail struct {
	Err error
	// 发生错误的ckconn对象
	Conn CKConn
}

type InsertErrDetail

type InsertErrDetail struct {
	Err        error
	ShardIndex int
	Datas      interface{}
}

type OptionFunc

type OptionFunc func(*option)

func WithGroupInsertLimiter

func WithGroupInsertLimiter(limit rate.Limit, burst int) OptionFunc

func WithRetryNum

func WithRetryNum(retryNum int) OptionFunc

type ShardConn

type ShardConn interface {
	GetAllConn() []CKConn
	GetReplicaConn() []CKConn
	GetShardConn() CKConn

	// Exec 所有节点执行
	Exec(ignoreErr bool, query string, args ...interface{}) []hostErr

	// AlterAuto 随机在一个节点上执行,如果出错自动在下个节点尝试
	AlterAuto(query string, args ...interface{}) error

	// InsertAuto 随机在一个节点上插入,如果出错会自动在下个节点插入
	InsertAuto(query string, sliceData interface{}) error

	Close()
}

func MustShardConn

func MustShardConn(shardIndex int, conf config.ShardGroupConfig) ShardConn

func NewShardConn

func NewShardConn(shardIndex int, conf config.ShardGroupConfig) (ShardConn, error)

Directories

Path Synopsis
dbtesttool

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL