README ¶
github.com/liumingmin/goutils
gotuils目标是快速搭建应用的辅助代码库
- ws模块用法
- 常用工具库
- algorithm
- cache 缓存模块
- conf yaml配置模块
- container 容器模块
- db 数据库
- log zap日志库
- middleware 中间件
- net 网络库
- utils 通用工具库
- ws websocket客户端和服务端库
ws模块用法
protoc --go_out=. ws/msg.proto
//js
cd ws
protoc --js_out=import_style=commonjs,binary:js msg.proto
cd js
npm i google-protobuf
npm i -g browserify
browserify msg_pb.js -o msg_pb_dist.js
https://www.npmjs.com/package/google-protobuf
常用工具库
文件 | 说明 |
---|---|
async.go | 带超时异步调用 |
crc16.go | 查表法crc16 |
crc16-kermit.go | 算法实现crc16 |
csv_parse.go | csv解析封装 |
httputils.go | httpClient工具 |
math.go | 数学库 |
models.go | 反射创建对象 |
stringutils.go | 字符串处理 |
struct.go | 结构体工具(拷贝、合并) |
tags.go | 结构体tag工具 |
utils.go | 其他工具类 |
algorithm
crc16_test.go crc16算法
TestCrc16
t.Log(Crc16([]byte("abcdefg")))
descartes_test.go 笛卡尔组合
TestDescartes
result := DescartesCombine([][]string{{"A", "B"}, {"1", "2", "3"}, {"a", "b", "c", "d"}})
for _, item := range result {
t.Log(item)
}
cache 缓存模块
mem_cache_test.go 内存缓存
TestMemCacheFunc
ctx := context.Background()
const cacheKey = "UT:%v:%v"
var lCache = cache.New(5*time.Minute, 5*time.Minute)
result, err := MemCacheFunc(ctx, lCache, 60*time.Second, rawGetFunc0, cacheKey, "p1", "p2")
log.Info(ctx, "%v %v %v", result, err, printKind(result))
_memCacheFuncTestMore(ctx, lCache, cacheKey)
rds_cache_test.go Redis缓存
TestRdscCacheFunc
redisDao.InitRedises()
ctx := context.Background()
const cacheKey = "UT:%v:%v"
const RDSC_DB = "rdscdb"
rds := redisDao.Get(RDSC_DB)
result, err := RdsCacheFunc(ctx, rds, 60, rawGetFunc0, cacheKey, "p1", "p2")
log.Info(ctx, "%v %v %v", result, err, printKind(result))
_rdsDeleteCacheTestMore(ctx, rds, cacheKey)
TestRdsCacheMultiFunc
redisDao.InitRedises()
ctx := context.Background()
const RDSC_DB = "rdscdb"
rds := redisDao.Get(RDSC_DB)
result, err := RdsCacheMultiFunc(ctx, rds, 30, getThingsByIds, "multikey:%s", []string{"1", "2", "5", "3", "4", "10"})
if err == nil && result != nil {
mapValue, ok := result.(map[string]*Thing)
if ok {
for key, value := range mapValue {
log.Info(ctx, "%v===%v", key, value)
}
}
}
conf yaml配置模块
container 容器模块
bitmap_test.go 比特位表
TestBitmapExists
bitmap := initTestData()
t.Log(bitmap)
t.Log(bitmap.Exists(122))
t.Log(bitmap.Exists(123))
//data1 := []byte{1, 2, 4, 7}
//data2 := []byte{0, 1, 5}
TestBitmapSet
bitmap := initTestData()
t.Log(bitmap.Exists(1256))
bitmap.Set(1256)
t.Log(bitmap.Exists(1256))
TestBitmapUnionOr
bitmap := initTestData()
bitmap2 := initTestData()
bitmap2.Set(256)
bitmap3 := bitmap.Union(&bitmap2)
t.Log(bitmap3.Exists(256))
bitmap3.Set(562)
t.Log(bitmap3.Exists(562))
t.Log(bitmap.Exists(562))
TestBitmapBitInverse
bitmap := initTestData()
t.Log(bitmap.Exists(66))
bitmap.Inverse()
t.Log(bitmap.Exists(66))
const_hash_test.go 一致性HASH
TestConstHash
var ringchash CHashRing
var configs []CHashNode
for i := 0; i < 10; i++ {
configs = append(configs, TestNode("node"+strconv.Itoa(i)))
}
ringchash.Adds(configs)
fmt.Println(ringchash.Debug())
fmt.Println("==================================", configs)
fmt.Println(ringchash.Get("jjfdsljk:dfdfd:fds"))
fmt.Println(ringchash.Get("jjfdxxvsljk:dddsaf:xzcv"))
//
fmt.Println(ringchash.Get("fcds:cxc:fdsfd"))
//
fmt.Println(ringchash.Get("vdsafd:32:fdsfd"))
fmt.Println(ringchash.Get("xvd:fs:xcvd"))
var configs2 []CHashNode
for i := 0; i < 2; i++ {
configs2 = append(configs2, TestNode("node"+strconv.Itoa(10+i)))
}
ringchash.Adds(configs2)
fmt.Println("==================================")
fmt.Println(ringchash.Debug())
fmt.Println(ringchash.Get("jjfdsljk:dfdfd:fds"))
fmt.Println(ringchash.Get("jjfdxxvsljk:dddsaf:xzcv"))
//
fmt.Println(ringchash.Get("fcds:cxc:fdsfd"))
//
fmt.Println(ringchash.Get("vdsafd:32:fdsfd"))
fmt.Println(ringchash.Get("xvd:fs:xcvd"))
ringchash.Del("node0")
fmt.Println("==================================")
fmt.Println(ringchash.Debug())
fmt.Println(ringchash.Get("jjfdsljk:dfdfd:fds"))
fmt.Println(ringchash.Get("jjfdxxvsljk:dddsaf:xzcv"))
//
fmt.Println(ringchash.Get("fcds:cxc:fdsfd"))
//
fmt.Println(ringchash.Get("vdsafd:32:fdsfd"))
fmt.Println(ringchash.Get("xvd:fs:xcvd"))
lighttimer_test.go 轻量级计时器
TestStartTicks
lt := NewLightTimer()
lt.StartTicks(time.Millisecond)
lt.AddTimer(time.Second*time.Duration(2), func(fireSeqNo uint) bool {
fmt.Println("callback", fireSeqNo, "-")
if fireSeqNo == 4 {
return true
}
return false
})
time.Sleep(time.Hour)
TestStartTicksDeadline
//NewLightTimerPool
lt := NewLightTimer()
lt.StartTicks(time.Millisecond)
lt.AddTimerWithDeadline(time.Second*time.Duration(2), time.Now().Add(time.Second*5), func(seqNo uint) bool {
fmt.Println("callback", seqNo, "-")
if seqNo == 4 {
return true
}
return false
}, func(seqNo uint) bool {
fmt.Println("end callback", seqNo, "-")
return true
})
time.Sleep(time.Hour)
TestLtPool
pool := NewLightTimerPool(10, time.Millisecond)
for i := 0; i < 100000; i++ {
tmp := i
pool.AddTimerWithDeadline(strconv.Itoa(tmp), time.Second*time.Duration(2), time.Now().Add(time.Second*5), func(seqNo uint) bool {
fmt.Println("callback", tmp, "-", seqNo, "-")
if seqNo == 4 {
return true
}
return false
}, func(seqNo uint) bool {
fmt.Println("end callback", tmp, "-", seqNo, "-")
return true
})
}
time.Sleep(time.Second * 20)
fmt.Println(runtime.NumGoroutine())
time.Sleep(time.Hour)
TestStartTicks2
lt := NewLightTimer()
lt.StartTicks(time.Millisecond)
lt.AddCallback(time.Second*time.Duration(3), func() {
fmt.Println("invoke once")
})
time.Sleep(time.Hour)
db 数据库
elasticsearch ES搜索引擎
es6 ES6版本API
es_test.go
TestCreateIndexByModel
InitClients()
client := GetEsClient(testUserIndexKey)
err := client.CreateIndexByModel(context.Background(), testUserIndexName, &MappingModel{
Mappings: map[string]Mapping{
testUserTypeName: {
Dynamic: false,
Properties: map[string]*elasticsearch.MappingProperty{
"userId": {
Type: "text",
Index: false,
},
"nickname": {
Type: "text",
Analyzer: "standard",
Fields: map[string]*elasticsearch.MappingProperty{
"std": {
Type: "text",
Analyzer: "standard",
ExtProps: map[string]interface{}{
"term_vector": "with_offsets",
},
},
"keyword": {
Type: "keyword",
},
},
},
"status": {
Type: "keyword",
},
"pType": {
Type: "keyword",
},
},
},
},
Settings: elasticsearch.MappingSettings{
SettingsIndex: elasticsearch.SettingsIndex{
IgnoreMalformed: true,
NumberOfReplicas: 1,
NumberOfShards: 3,
},
},
})
t.Log(err)
TestEsInsert
InitClients()
client := GetEsClient(testUserIndexKey)
for i := 0; i < 100; i++ {
ptype := "normal"
if i%10 == 5 {
ptype = "vip"
}
status := "valid"
if i%30 == 2 {
status = "invalid"
}
id := "000000000" + fmt.Sprint(i)
err := client.Insert(context.Background(), testUserIndexName, testUserTypeName,
id, testUser{UserId: id, Nickname: "超级棒" + id, Status: status, Type: ptype})
t.Log(err)
}
TestEsBatchInsert
InitClients()
client := GetEsClient(testUserIndexKey)
ids := make([]string, 0)
items := make([]interface{}, 0)
for i := 0; i < 100; i++ {
ptype := "normal"
if i%10 == 5 {
ptype = "vip"
}
status := "valid"
if i%30 == 2 {
status = "invalid"
}
id := "x00000000" + fmt.Sprint(i)
ids = append(ids, id)
items = append(items, &testUser{UserId: id, Nickname: "超级棒" + id, Status: status, Type: ptype})
}
err := client.BatchInsert(context.Background(), testUserIndexName, testUserTypeName, ids, items)
t.Log(err)
TestEsUpdateById
InitClients()
client := GetEsClient(testUserIndexKey)
id := "000000000" + fmt.Sprint(30)
err := client.UpdateById(context.Background(), testUserIndexName, testUserTypeName,
id, map[string]interface{}{
"status": "invalid",
})
t.Log(err)
err = client.UpdateById(context.Background(), testUserIndexName, testUserTypeName,
id, map[string]interface{}{
"extField": "ext1234",
})
t.Log(err)
TestDeleteById
InitClients()
client := GetEsClient(testUserIndexKey)
id := "000000000" + fmt.Sprint(9)
err := client.DeleteById(context.Background(), testUserIndexName, testUserTypeName,
id)
t.Log(err)
TestQueryEs
InitClients()
client := GetEsClient(testUserIndexKey)
bq := elastic.NewBoolQuery()
bq.Must(elastic.NewMatchQuery("nickname", "超级棒"))
var users []testUser
total := int64(0)
err := client.FindByModel(context.Background(), elasticsearch.QueryModel{
IndexName: testUserIndexName,
TypeName: testUserTypeName,
Query: bq,
Size: 5,
Results: &users,
Total: &total,
})
bs, _ := json.Marshal(users)
t.Log(len(users), total, string(bs), err)
TestQueryEsQuerySource
InitClients()
client := GetEsClient(testUserIndexKey)
source := `{
"from":0,
"size":25,
"query":{
"match":{"nickname":"超级"}
}
}`
var users []testUser
total := int64(0)
err := client.FindBySource(context.Background(), elasticsearch.SourceModel{
IndexName: testUserIndexName,
TypeName: testUserTypeName,
Source: source,
Results: &users,
Total: &total,
})
bs, _ := json.Marshal(users)
t.Log(len(users), total, string(bs), err)
TestAggregateBySource
InitClients()
client := GetEsClient(testUserIndexKey)
source := `{
"from": 0,
"size": 0,
"_source": {
"includes": [
"status",
"pType",
"COUNT"
],
"excludes": []
},
"stored_fields": [
"status",
"pType"
],
"aggregations": {
"status": {
"terms": {
"field": "status",
"size": 200,
"min_doc_count": 1,
"shard_min_doc_count": 0,
"show_term_doc_count_error": false,
"order": [
{
"_count": "desc"
},
{
"_key": "asc"
}
]
},
"aggregations": {
"pType": {
"terms": {
"field": "pType",
"size": 10,
"min_doc_count": 1,
"shard_min_doc_count": 0,
"show_term_doc_count_error": false,
"order": [
{
"_count": "desc"
},
{
"_key": "asc"
}
]
},
"aggregations": {
"statusCnt": {
"value_count": {
"field": "_index"
}
}
}
}
}
}
}
}`
var test AggregationTest
client.AggregateBySource(context.Background(), elasticsearch.AggregateModel{
IndexName: testUserIndexName,
TypeName: testUserTypeName,
Source: source,
AggKeys: []string{"status"},
}, &test)
t.Log(test)
es7 ES7版本API
es_test.go
TestCreateIndexByModel
InitClients()
client := GetEsClient(testUserIndexKey)
err := client.CreateIndexByModel(context.Background(), testUserIndexName, &MappingModel{
Mapping: Mapping{
Dynamic: false,
Properties: map[string]*elasticsearch.MappingProperty{
"userId": {
Type: "text",
Index: false,
},
"nickname": {
Type: "text",
Analyzer: "standard",
Fields: map[string]*elasticsearch.MappingProperty{
"std": {
Type: "text",
Analyzer: "standard",
ExtProps: map[string]interface{}{
"term_vector": "with_offsets",
},
},
"keyword": {
Type: "keyword",
},
},
},
"status": {
Type: "keyword",
},
"pType": {
Type: "keyword",
},
},
},
Settings: elasticsearch.MappingSettings{
SettingsIndex: elasticsearch.SettingsIndex{
IgnoreMalformed: true,
NumberOfReplicas: 2,
NumberOfShards: 3,
},
},
})
t.Log(err)
TestEsInsert
InitClients()
client := GetEsClient(testUserIndexKey)
for i := 0; i < 100; i++ {
ptype := "normal"
if i%10 == 5 {
ptype = "vip"
}
status := "valid"
if i%30 == 2 {
status = "invalid"
}
id := "000000000" + fmt.Sprint(i)
err := client.Insert(context.Background(), testUserIndexName,
id, testUser{UserId: id, Nickname: "超级棒" + id, Status: status, Type: ptype})
t.Log(err)
}
TestEsBatchInsert
InitClients()
client := GetEsClient(testUserIndexKey)
ids := make([]string, 0)
items := make([]interface{}, 0)
for i := 0; i < 100; i++ {
ptype := "normal"
if i%10 == 5 {
ptype = "vip"
}
status := "valid"
if i%30 == 2 {
status = "invalid"
}
id := "x00000000" + fmt.Sprint(i)
ids = append(ids, id)
items = append(items, &testUser{UserId: id, Nickname: "超级棒" + id, Status: status, Type: ptype})
}
err := client.BatchInsert(context.Background(), testUserIndexName, ids, items)
t.Log(err)
TestEsUpdateById
InitClients()
client := GetEsClient(testUserIndexKey)
id := "000000000" + fmt.Sprint(30)
err := client.UpdateById(context.Background(), testUserIndexName,
id, map[string]interface{}{
"status": "invalid",
})
t.Log(err)
err = client.UpdateById(context.Background(), testUserIndexName,
id, map[string]interface{}{
"extField": "ext1234",
})
t.Log(err)
TestDeleteById
InitClients()
client := GetEsClient(testUserIndexKey)
id := "000000000" + fmt.Sprint(9)
err := client.DeleteById(context.Background(), testUserIndexName, id)
t.Log(err)
TestQueryEs
InitClients()
client := GetEsClient(testUserIndexKey)
bq := elastic.NewBoolQuery()
bq.Must(elastic.NewMatchQuery("nickname", "超级棒"))
var users []testUser
total := int64(0)
err := client.FindByModel(context.Background(), elasticsearch.QueryModel{
IndexName: testUserIndexName,
Query: bq,
Size: 5,
Results: &users,
Total: &total,
})
bs, _ := json.Marshal(users)
t.Log(len(users), total, string(bs), err)
TestQueryEsQuerySource
InitClients()
client := GetEsClient(testUserIndexKey)
source := `{
"from":0,
"size":25,
"query":{
"match":{"nickname":"超级"}
}
}`
var users []testUser
total := int64(0)
err := client.FindBySource(context.Background(), elasticsearch.SourceModel{
IndexName: testUserIndexName,
Source: source,
Results: &users,
Total: &total,
})
bs, _ := json.Marshal(users)
t.Log(len(users), total, string(bs), err)
TestAggregateBySource
InitClients()
client := GetEsClient(testUserIndexKey)
source := `{
"from": 0,
"size": 0,
"_source": {
"includes": [
"status",
"pType",
"COUNT"
],
"excludes": []
},
"stored_fields": [
"status",
"pType"
],
"aggregations": {
"status": {
"terms": {
"field": "status",
"size": 200,
"min_doc_count": 1,
"shard_min_doc_count": 0,
"show_term_doc_count_error": false,
"order": [
{
"_count": "desc"
},
{
"_key": "asc"
}
]
},
"aggregations": {
"pType": {
"terms": {
"field": "pType",
"size": 10,
"min_doc_count": 1,
"shard_min_doc_count": 0,
"show_term_doc_count_error": false,
"order": [
{
"_count": "desc"
},
{
"_key": "asc"
}
]
},
"aggregations": {
"statusCnt": {
"value_count": {
"field": "_index"
}
}
}
}
}
}
}
}`
var test AggregationTest
client.AggregateBySource(context.Background(), elasticsearch.AggregateModel{
IndexName: testUserIndexName,
Source: source,
AggKeys: []string{"status"},
}, &test)
t.Log(test)
kafka kafka消息队列
kafka_test.go
TestKafkaProducer
InitKafka()
producer := GetProducer("user_producer")
producer.Produce(&sarama.ProducerMessage{
Topic: userTopic,
Key: sarama.ByteEncoder(fmt.Sprint(time.Now().Unix())),
Value: sarama.ByteEncoder(fmt.Sprint(time.Now().Unix())),
})
time.Sleep(time.Second * 5)
TestKafkaConsumer
InitKafka()
consumer := GetConsumer("user_consumer")
go func() {
consumer.Consume(userTopic, func(msg *sarama.ConsumerMessage) error {
fmt.Println(string(msg.Key), "=", string(msg.Value))
return nil
}, func(err error) {
})
}()
producer := GetProducer("user_producer")
for i := 0; i < 10; i++ {
producer.Produce(&sarama.ProducerMessage{
Topic: userTopic,
Key: sarama.ByteEncoder(fmt.Sprint(i)),
Value: sarama.ByteEncoder(fmt.Sprint(time.Now().Unix())),
})
}
time.Sleep(time.Second * 5)
mongo mongo数据库
collection_test.go
TestInsert
ctx := context.Background()
InitClients()
c, _ := MgoClient(dbKey)
op := NewCompCollectionOp(c, dbName, collectionName)
op.Insert(ctx, testUser{
UserId: "1",
Nickname: "超级棒",
Status: "valid",
Type: "normal",
})
var result interface{}
op.FindOne(ctx, FindModel{
Query: bson.M{"user_id": "1"},
Results: &result,
})
log.Info(ctx, "result: %v", result)
TestUpdate
ctx := context.Background()
InitClients()
c, _ := MgoClient(dbKey)
op := NewCompCollectionOp(c, dbName, collectionName)
op.Update(ctx, bson.M{"user_id": "1"}, bson.M{"$set": bson.M{"nick_name": "超级棒++"}})
var result interface{}
op.FindOne(ctx, FindModel{
Query: bson.M{"user_id": "1"},
Results: &result,
})
log.Info(ctx, "result: %v", result)
TestFind
ctx := context.Background()
InitClients()
c, _ := MgoClient(dbKey)
op := NewCompCollectionOp(c, dbName, collectionName)
var result []bson.M
err := op.Find(ctx, FindModel{
Query: bson.M{"user_id": "1"},
Results: &result,
})
if err != nil {
log.Error(ctx, "Mgo find err: %v", err)
return
}
for _, item := range result {
t.Log(item)
}
TestDelete
ctx := context.Background()
InitClients()
c, _ := MgoClient(dbKey)
op := NewCompCollectionOp(c, dbName, collectionName)
op.Delete(ctx, bson.M{"user_id": "1"})
var result interface{}
op.FindOne(ctx, FindModel{
Query: bson.M{"user_id": "1"},
Results: &result,
})
log.Info(ctx, "result: %v", result)
TestUpert
ctx := context.Background()
InitClients()
c, _ := MgoClient(dbKey)
op := NewCompCollectionOp(c, dbName, collectionName)
err := op.Upsert(ctx, bson.M{"name": "tom2"}, bson.M{"$set": bson.M{"birth": "2020"}}, bson.M{"birth2": "2024"})
t.Log(err)
TestBulkUpdateItems
ctx := context.Background()
InitClients()
c, _ := MgoClient(dbKey)
op := NewCompCollectionOp(c, dbName, collectionName)
err := op.BulkUpdateItems(ctx, []*BulkUpdateItem{
{Selector: bson.M{"name": "tom"}, Update: bson.M{"$set": bson.M{"birth": "1"}}},
{Selector: bson.M{"name": "tom1"}, Update: bson.M{"$set": bson.M{"birth2": "2"}}},
})
t.Log(err)
TestBulkUpsertItems
ctx := context.Background()
InitClients()
c, _ := MgoClient(dbKey)
op := NewCompCollectionOp(c, dbName, collectionName)
err := op.BulkUpsertItem(ctx, []*BulkUpsertItem{
{Selector: bson.M{"name": "tim"}, Replacement: bson.M{"name": "tim", "birth": "3"}},
{Selector: bson.M{"name": "tim1"}, Replacement: bson.M{"name": "tim1", "birth2": "4"}},
})
t.Log(err)
redis go-redis
list_test.go Redis List工具库
TestList
InitRedises()
rds := Get("rdscdb")
ctx := context.Background()
err := ListPush(ctx, rds, "test_list", "stringvalue")
t.Log(err)
ListPop(rds, []string{"test_list"}, 3600, 1000, func(key, data string) {
fmt.Println(key, data)
})
err = ListPush(ctx, rds, "test_list", "stringvalue")
t.Log(err)
time.Sleep(time.Second * 20)
lock_test.go Redis 锁工具库
TestRdsAllowActionWithCD
InitRedises()
rds := Get("rdscdb")
ctx := context.Background()
cd, ok := RdsAllowActionWithCD(ctx, rds, "test:action", 2)
t.Log(cd, ok)
cd, ok = RdsAllowActionWithCD(ctx, rds, "test:action", 2)
t.Log(cd, ok)
time.Sleep(time.Second * 3)
cd, ok = RdsAllowActionWithCD(ctx, rds, "test:action", 2)
t.Log(cd, ok)
TestRdsAllowActionByMTs
InitRedises()
rds := Get("rdscdb")
ctx := context.Background()
cd, ok := RdsAllowActionByMTs(ctx, rds, "test:action", 500, 10)
t.Log(cd, ok)
cd, ok = RdsAllowActionByMTs(ctx, rds, "test:action", 500, 10)
t.Log(cd, ok)
time.Sleep(time.Second)
cd, ok = RdsAllowActionByMTs(ctx, rds, "test:action", 500, 10)
t.Log(cd, ok)
TestRdsLockResWithCD
InitRedises()
rds := Get("rdscdb")
ctx := context.Background()
ok := RdsLockResWithCD(ctx, rds, "test:res", "res-1", 3)
t.Log(ok)
ok = RdsLockResWithCD(ctx, rds, "test:res", "res-2", 3)
t.Log(ok)
time.Sleep(time.Second * 4)
ok = RdsLockResWithCD(ctx, rds, "test:res", "res-2", 3)
t.Log(ok)
mq_test.go Redis PubSub工具库
TestMqPSubscribe
InitRedises()
rds := Get("rdscdb")
ctx := context.Background()
MqPSubscribe(ctx, rds, "testkey:*", func(channel string, data string) {
fmt.Println(channel, data)
}, 10)
err := MqPublish(ctx, rds, "testkey:1", "id:1")
t.Log(err)
err = MqPublish(ctx, rds, "testkey:2", "id:2")
t.Log(err)
err = MqPublish(ctx, rds, "testkey:3", "id:3")
t.Log(err)
time.Sleep(time.Second * 3)
zset_test.go Redis ZSet工具库
TestZDescartes
InitRedises()
rds := Get("rdscdb")
ctx := context.Background()
dimValues := [][]string{{"dim1a", "dim1b"}, {"dim2a", "dim2b", "dim2c", "dim2d"}, {"dim3a", "dim3b", "dim3c"}}
dt, err := csv.ReadCsvToDataTable(ctx, "data.csv", ',',
[]string{"id", "name", "createtime", "dim1", "dim2", "dim3", "member"}, "id", []string{})
if err != nil {
t.Log(err)
return
}
err = ZDescartes(ctx, rds, dimValues, func(strs []string) (string, map[string]int64) {
dimData := make(map[string]int64)
for _, row := range dt.Rows() {
if row.String("dim1") == strs[0] &&
row.String("dim2") == strs[1] &&
row.String("dim3") == strs[2] {
dimData[row.String("member")] = row.Int64("createtime")
}
}
return "rds" + strings.Join(strs, "-"), dimData
}, 1000, 30)
t.Log(err)
log zap日志库
zap_test.go
TestZap
ctx := &gin.Context{}
ctx.Set("__traceId", "aaabbbbbcccc")
//Info(ctx, "我是日志", "name", "管理员") //json
Info(ctx, "我是日志2")
//Info(ctx, "我是日志3", "name") //json
Error(ctx, "我是日志4: %v,%v", "管理员", "eee")
TestZapJson
ctx := &gin.Context{}
ctx.Set("__traceId", "aaabbbbbcccc")
Info(ctx, "我是日志 %v", "name", "管理员") //json
Info(ctx, "我是日志3 %v", "管理员") //json
Error(ctx, "我是日志3") //json
Log(ctx, zapcore.ErrorLevel, "日志啊")
TestPanicLog
testPanicLog()
Info(context.Background(), "catch panic")
TestLevelChange
traceId := strings.Replace(uuid.New().String(), "-", "", -1)
ctx := context.WithValue(context.Background(), LOG_TRADE_ID, traceId)
Error(ctx, LogLess())
Error(ctx, LogLess())
Error(ctx, LogLess())
Error(ctx, LogLess())
Error(ctx, LogLess())
Error(ctx, LogLess())
Error(ctx, LogLess())
Error(ctx, LogLess())
fmt.Println(LogLess(), "============")
Info(ctx, LogMore())
Info(ctx, LogMore())
Info(ctx, LogMore())
Info(ctx, LogMore())
Info(ctx, LogMore())
Info(ctx, LogMore())
Info(ctx, LogMore())
Info(ctx, LogMore())
fmt.Println(LogMore(), "============")
middleware 中间件
limit_conn_test.go 限连接模块
TestLimitConn
router := gin.New()
lr := NewLimitConn(reqHostIp)
router.Use(lr.Incoming(nil, 10, 4))
router.GET("/testurl", func(c *gin.Context) {
time.Sleep(time.Second)
fmt.Println("enter")
c.String(http.StatusOK, "ok!!")
}, lr.Leaving(nil))
safego.Go(func() {
router.Run(":8081")
})
time.Sleep(time.Second * 3)
for j := 0; j < 5; j++ {
time.Sleep(time.Second * 1)
for i := 0; i < 20; i++ {
safego.Go(func() {
resp, err := http.Get("http://127.0.0.1:8081/testurl")
if err != nil {
fmt.Println(err)
} else {
if 200 != resp.StatusCode {
fmt.Println("点击太快了", resp.StatusCode)
}
}
})
}
}
//w1 := utils.PerformTestRequest("GET", "/testurl", router)
//if 200 == w1.Code {
// fmt.Println("okk")
//}
time.Sleep(time.Minute * 20)
limit_req_test.go 限流模块
TestLimitReq
router := gin.New()
lr := NewLimitReq(reqHostIp)
router.Use(lr.Incoming(nil, 10, 4))
router.GET("/testurl", func(c *gin.Context) {
time.Sleep(time.Second)
fmt.Println("enter")
c.String(http.StatusOK, "ok!!")
})
safego.Go(func() {
router.Run(":8080")
})
time.Sleep(time.Second * 3)
for j := 0; j < 5; j++ {
time.Sleep(time.Second * 1)
for i := 0; i < 20; i++ {
safego.Go(func() {
resp, err := http.Get("http://127.0.0.1:8080/testurl")
if err != nil {
fmt.Println(err)
} else {
if 200 != resp.StatusCode {
fmt.Println("点击太快了", resp.StatusCode)
}
}
})
}
}
//w1 := utils.PerformTestRequest("GET", "/testurl", router)
//if 200 == w1.Code {
// fmt.Println("okk")
//}
time.Sleep(time.Minute * 20)
service_handler_test.go service封装器
TestServiceHandler
router := gin.New()
router.POST("/foo", ServiceHandler(serviceFoo, fooReq{}, nil))
router.Run(":8080")
net 网络库
httpx 兼容http1.x和2.0的httpclient
httpclientx_test.go
TestHttpXGet
clientX := getHcx()
for i := 0; i < 3; i++ {
resp, err := clientX.Get("http://127.0.0.1:8049")
if err != nil {
t.Fatal(fmt.Errorf("error making request: %v", err))
}
t.Log(resp.StatusCode)
t.Log(resp.Proto)
}
TestHttpXPost
clientX := getHcx()
for i := 0; i < 3; i++ {
resp, err := clientX.Get("http://127.0.0.1:8881")
if err != nil {
t.Fatal(fmt.Errorf("error making request: %v", err))
}
t.Log(resp.StatusCode)
t.Log(resp.Proto)
}
ip
packet tcp包model
proxy ssh proxy
ssh_client_test.go
TestSshClient
client := getSshClient(t)
defer client.Close()
session, err := client.NewSession()
if err != nil {
t.Fatalf("Create session failed %v", err)
}
defer session.Close()
// run command and capture stdout/stderr
output, err := session.CombinedOutput("ls -l /data")
if err != nil {
t.Fatalf("CombinedOutput failed %v", err)
}
t.Log(string(output))
TestMysqlSshClient
client := getSshClient(t)
defer client.Close()
//test时候,打开,会引入mysql包
//mysql.RegisterDialContext("tcp", func(ctx context.Context, addr string) (net.Conn, error) {
// return client.Dial("tcp", addr)
//})
db, err := sql.Open("", "")
if err != nil {
t.Fatalf("open db failed %v", err)
}
defer db.Close()
rs, err := db.Query("select limit 10")
if err != nil {
t.Fatalf("open db failed %v", err)
}
defer rs.Close()
for rs.Next() {
}
serverx 兼容http1.x和2.0的http server
utils 通用工具库
buffer_invoker 异步调用
buffer_invoker_test.go
TestFuncBuffer
for i := 0; i < 100; i++ {
item := strconv.Itoa(i)
safego.Go(func() {
fb.Invoke("1234", item)
})
}
fmt.Println("for end1")
time.Sleep(time.Second * 10)
for i := 0; i < 100; i++ {
item := strconv.Itoa(i)
safego.Go(func() {
fb.Invoke("1234", item)
})
}
fmt.Println("for end2")
time.Sleep(time.Second * 60)
cbk 熔断器
cbk_test.go
TestCbkFailed
InitCbk()
var ok bool
var lastBreaked bool
for j := 0; j < 200; j++ {
i := j
//safego.Go(func() {
err := Impls[SIMPLE].Check("test") //30s 返回一次true尝试
fmt.Println(i, "Check:", ok)
if err == nil {
time.Sleep(time.Millisecond * 10)
Impls[SIMPLE].Failed("test")
if i > 105 && lastBreaked {
Impls[SIMPLE].Succeed("test")
lastBreaked = false
fmt.Println(i, "Succeed")
}
} else {
if lastBreaked {
time.Sleep(time.Second * 10)
} else {
lastBreaked = true
}
}
//})
}
csv CSV文件解析为MDB内存表
csv_parse_test.go
TestReadCsvToDataTable
dt, err := ReadCsvToDataTable(context.Background(), `goutils.log`, '\t',
[]string{"xx", "xx", "xx", "xx"}, "xxx", []string{"xxx"})
if err != nil {
t.Log(err)
return
}
for _, r := range dt.Rows() {
t.Log(r.Data())
}
rs := dt.RowsBy("xxx", "869")
for _, r := range rs {
t.Log(r.Data())
}
t.Log(dt.Row("17"))
distlock 分布式锁
consullock_test.go
TestAquireConsulLock
l, _ := NewConsulLock("accountId", 10)
//l.Lock(15)
//l.Unlock()
ctx := context.Background()
fmt.Println("try lock 1")
fmt.Println(l.Lock(ctx, 5))
//time.Sleep(time.Second * 6)
//fmt.Println("try lock 2")
//fmt.Println(l.Lock(3))
l2, _ := NewConsulLock("accountId", 10)
fmt.Println("try lock 3")
fmt.Println(l2.Lock(ctx, 15))
l3, _ := NewConsulLock("accountId", 10)
fmt.Println("try lock 4")
fmt.Println(l3.Lock(ctx, 15))
time.Sleep(time.Minute)
filelock_test.go
TestFileLock
test_file_path, _ := os.Getwd()
locked_file := test_file_path
wg := sync.WaitGroup{}
for i := 0; i < 10; i++ {
wg.Add(1)
go func(num int) {
flock := NewFileLock(locked_file, false)
err := flock.Lock()
if err != nil {
wg.Done()
fmt.Println(err.Error())
return
}
fmt.Printf("output : %d\n", num)
wg.Done()
}(i)
}
wg.Wait()
time.Sleep(2 * time.Second)
rdslock_test.go
TestRdsLock
redis.InitRedises()
l, _ := NewRdsLuaLock("rdscdb", "accoutId", 4)
l2, _ := NewRdsLuaLock("rdscdb", "accoutId", 4)
//l.Lock(15)
//l.Unlock()
ctx := context.Background()
fmt.Println(l.Lock(ctx, 5))
fmt.Println("1getlock")
fmt.Println(l2.Lock(ctx, 5))
fmt.Println("2getlock")
time.Sleep(time.Second * 15)
//l2, _ := NewRdsLuaLock("accoutId", 15)
//t.Log(l2.Lock(5))
docgen 文档自动生成
cmd
docgen_test.go
TestGenDocTestUser
sb := strings.Builder{}
sb.WriteString(genDocTestUserQuery())
sb.WriteString(genDocTestUserCreate())
sb.WriteString(genDocTestUserUpdate())
sb.WriteString(genDocTestUserDelete())
GenDoc(context.Background(), "用户管理", "doc/testuser.md", 2, sb.String())
fsm 有限状态机
hc httpclient工具
ismtp 邮件工具
ismtp_test.go
TestSendEmail
emailauth := LoginAuth(
"from",
"xxxxxx",
"mailhost.com",
)
ctype := fmt.Sprintf("Content-Type: %s; charset=%s", "text/plain", "utf-8")
msg := fmt.Sprintf("To: %s\r\nCc: %s\r\nFrom: %s\r\nSubject: %s\r\n%s\r\n\r\n%s",
strings.Join([]string{"target@mailhost.com"}, ";"),
"",
"from@mailhost.com",
"测试",
ctype,
"测试")
err := SendMail("mailhost.com:port", //convert port number from int to string
emailauth,
"from@mailhost.com",
[]string{"target@mailhost.com"},
[]byte(msg),
)
if err != nil {
t.Log(err)
return
}
return
safego 安全的go协程
snowflake
snowflake_test.go 雪花ID生成器
TestSnowflake
n, _ := NewNode(1)
t.Log(n.Generate(), ",", n.Generate(), ",", n.Generate())
tags_test.go 结构体TAG生成器
TestAutoGenTags
fmt.Println(AutoGenTags(testUser{}, map[string]TAG_STYLE{
"json": TAG_STYLE_SNAKE,
"bson": TAG_STYLE_UNDERLINE,
"form": TAG_STYLE_ORIG,
}))
ws websocket客户端和服务端库
js
ws websocket客户端和服务端库
wss_test.go
TestWssRun
InitServer() //server invoke 服务端调用
InitClient() //client invoke 客户端调用
ctx := context.Background()
const (
C2S_REQ = 1
S2C_RESP = 2
)
//server reg handler
RegisterHandler(C2S_REQ, func(ctx context.Context, connection *Connection, message *Message) error {
log.Info(ctx, "server recv: %v, %v", message.PMsg().ProtocolId, string(message.PMsg().Data))
packet := GetPoolMessage(S2C_RESP)
packet.PMsg().Data = []byte("server response")
connection.SendMsg(ctx, packet, nil)
return nil
})
//server start
e := gin.New()
e.GET("/join", func(ctx *gin.Context) {
connMeta := ConnectionMeta{
UserId: ctx.DefaultQuery("uid", ""),
Typed: 0,
DeviceId: "",
Version: 0,
Charset: 0,
}
_, err := AcceptGin(ctx, connMeta, DebugOption(true),
SrvUpgraderCompressOption(true),
CompressionLevelOption(2),
ConnEstablishHandlerOption(func(conn *Connection) {
log.Info(context.Background(), "conn establish: %v", conn.Id())
}),
ConnClosingHandlerOption(func(conn *Connection) {
log.Info(context.Background(), "conn closing: %v", conn.Id())
}),
ConnClosedHandlerOption(func(conn *Connection) {
log.Info(context.Background(), "conn closed: %v", conn.Id())
}))
if err != nil {
log.Error(ctx, "Accept client connection failed. error: %v", err)
return
}
})
go e.Run(":8003")
//client reg handler
RegisterHandler(S2C_RESP, func(ctx context.Context, connection *Connection, message *Message) error {
log.Info(ctx, "client recv: %v, %v", message.PMsg().ProtocolId, string(message.PMsg().Data))
return nil
})
//client connect
uid := "100"
url := "ws://127.0.0.1:8003/join?uid=" + uid
conn, _ := DialConnect(context.Background(), url, http.Header{},
DebugOption(true),
ClientIdOption("server1"),
ClientDialWssOption(url, false),
ClientDialCompressOption(true),
CompressionLevelOption(2),
ConnEstablishHandlerOption(func(conn *Connection) {
log.Info(context.Background(), "conn establish: %v", conn.Id())
}),
ConnClosingHandlerOption(func(conn *Connection) {
log.Info(context.Background(), "conn closing: %v", conn.Id())
}),
ConnClosedHandlerOption(func(conn *Connection) {
log.Info(context.Background(), "conn closed: %v", conn.Id())
}),
)
log.Info(ctx, "%v", conn)
time.Sleep(time.Second * 5)
packet := GetPoolMessage(C2S_REQ)
packet.PMsg().Data = []byte("client request")
conn.SendMsg(context.Background(), packet, nil)
//time.Sleep(time.Second * 20)
//conn.KickServer(false)
time.Sleep(time.Minute * 1)
Documentation ¶
There is no documentation for this package.
Directories ¶
Path | Synopsis |
---|---|
db
|
|
net
|
|
ismtp
Package smtp implements the Simple Mail Transfer Protocol as defined in RFC 5321.
|
Package smtp implements the Simple Mail Transfer Protocol as defined in RFC 5321. |
snowflake
Package snowflake provides a very simple Twitter snowflake generator and parser.
|
Package snowflake provides a very simple Twitter snowflake generator and parser. |
Click to show internal directories.
Click to hide internal directories.