maudit

command module
v0.0.0-...-f5108b3 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jan 8, 2024 License: MIT Imports: 10 Imported by: 0

README

审计中心

基础: 消息队列

package kafka_test

import (
	"context"
	"fmt"
	"log"
	"net"
	"strconv"
	"testing"

	"github.com/segmentio/kafka-go"
)

func TestCreateTopic(t *testing.T) {
	conn, err := kafka.Dial("tcp", "localhost:9092")
	if err != nil {
		panic(err.Error())
	}
	defer conn.Close()

	controller, err := conn.Controller()
	if err != nil {
		panic(err.Error())
	}
	var controllerConn *kafka.Conn
	controllerConn, err = kafka.Dial("tcp", net.JoinHostPort(controller.Host, strconv.Itoa(controller.Port)))
	if err != nil {
		panic(err.Error())
	}
	defer controllerConn.Close()

	err = controllerConn.CreateTopics(kafka.TopicConfig{Topic: "topic-A", NumPartitions: 6, ReplicationFactor: 1})

	if err != nil {
		t.Fatal(err)
	}
}

func TestPublishMessage(t *testing.T) {
	// make a writer that produces to topic-A, using the least-bytes distribution
	w := &kafka.Writer{
		Addr: kafka.TCP("localhost:9092"),
		// NOTE: When Topic is not defined here, each Message must define it instead.
		Topic:    "topic-A",
		Balancer: &kafka.LeastBytes{},
		// The topic will be created if it is missing.
		AllowAutoTopicCreation: false,
		// 支持消息压缩
		// Compression: kafka.Snappy,
		// 支持TLS
		// Transport: &kafka.Transport{
		//     TLS: &tls.Config{},
		// }
	}

	err := w.WriteMessages(context.Background(),
		kafka.Message{
			// 支持 Writing to multiple topics
			//  NOTE: Each Message has Topic defined, otherwise an error is returned.
			// Topic: "topic-A",
			Key:   []byte("Key-A"),
			Value: []byte("Hello World!"),
		},
		kafka.Message{
			Key:   []byte("Key-B"),
			Value: []byte("One!"),
		},
		kafka.Message{
			Key:   []byte("Key-C"),
			Value: []byte("Two!"),
		},
	)

	if err != nil {
		log.Fatal("failed to write messages:", err)
	}

	if err := w.Close(); err != nil {
		log.Fatal("failed to close writer:", err)
	}
}

func TestConsumer(t *testing.T) {
	// make a new reader that consumes from topic-A
	r := kafka.NewReader(kafka.ReaderConfig{
		Brokers: []string{"localhost:9092"},
		// Consumer Groups, 不指定就是普通的一个Consumer
		GroupID: "consumer-group-id",
		// 可以指定Partition消费消息
		// Partition: 0,
		Topic:    "topic-A",
		MaxBytes: 10e6, // 10MB
	})

	for {
		m, err := r.ReadMessage(context.Background())
		if err != nil {
			break
		}
		fmt.Printf("message at topic/partition/offset %v/%v/%v: %s = %s\n", m.Topic, m.Partition, m.Offset, string(m.Key), string(m.Value))

		// 处理完消息后需要提交该消息已经消费完成, 消费者挂掉后保存消息消费的状态
		// FetchMessage() / CommitMessages(ctx, m) 分段提交
		// if err := r.CommitMessages(ctx, m); err != nil {
		//     log.Fatal("failed to commit messages:", err)
		// }
	}

	if err := r.Close(); err != nil {
		log.Fatal("failed to close reader:", err)
	}
}

如何在项目中使用与封装

Controller如何读取自定义配置
type impl struct {
	col *mongo.Collection
	log *zerolog.Logger
	kr  *go_kafka.Reader

	event.UnimplementedServiceServer
	ioc.ObjectImpl

	// 事件存储在哪些topic上面  需要配置
	// 因此下面是这个控制器实现的具体配置
	// 这些配置什么适合加载: 通过ioc, 如果我们通过toml来配置, 配key叫什么: Name()
	// # 关于event 模块的配置
	// [event]
	// group_id="maudit.group.id"
	// topics=["maudit.event"]
	GroupId string   `toml:"group_id" json:"group_id" yaml:"group_id" env:"EVNET_GROUP_ID"`
	Topics  []string `toml:"topics" json:"topics" yaml:"topics" env:"EVNET_TOPICS" envSeparator:","`
}

kafka配置: ioc 管理这kafka配置

type Kafka struct {
	Brokers        []string       `toml:"brokers" json:"brokers" yaml:"brokers"  env:"KAFKA_BROKERS"`
	ScramAlgorithm ScramAlgorithm `toml:"scram_algorithm" json:"scram_algorithm" yaml:"scram_algorithm"  env:"KAFKA_SCRAM_ALGORITHM"`
	UserName       string         `toml:"username" json:"username" yaml:"username"  env:"KAFKA_USERNAME"`
	Password       string         `toml:"password" json:"password" yaml:"password"  env:"KAFKA_PASSWORD"`

	mechanism sasl.Mechanism
	ioc.ObjectImpl
}
# kafka没有开启认证
[kafka]
username=""

# 关于event 模块的配置
[event]
group_id="maudit.group.id"
topics=["maudit.event"]
在Init时 运行
// 在Init方法执行之前, 会自动加载配置
func (i *impl) Init() error {
	i.log = logger.Sub(i.Name())

	...

	i.log.Debug().Msgf("group_id: %s, topics: %s", i.GroupId, i.Topics)
	// 使用ioc提供的kafka的配置信息 初始化一个reader
	i.kr = kafka.ConsumerGroup(i.GroupId, i.Topics)

	// 没有启动消息消费
	go i.ConsumerEvent()
	return nil
}
问题处理
# clone git checkout v1.9.29
mcube: v1.9.29

把mcube库放到 workspace

go work use ./mcube

如何基于kafak封装 maudit中间件

编写中间件
func NewAuditor() restful.FilterFunction {
	topic := os.Getenv("MAUDIT_EVENT_TOPIC")
	if topic == "" {
		topic = "maudit.event"
	}

	return (&auditor{
		// 引入中间件后,通过配置环境变量接入
		w: ioc_kafka.Producer(topic),
		l: logger.Sub("auditor"),
	}).GoRestfulAuthFunc
}

// 用于接入审计中心的中间件
type auditor struct {
	// 集成kafka writer
	w *kafka.Writer
	l *zerolog.Logger
}

func (a *auditor) GoRestfulAuthFunc(
	req *restful.Request,
	resp *restful.Response,
	next *restful.FilterChain) {

	//  从路由当做获取
	// 请求拦截, 权限检查
	entry := endpoint.NewEntryFromRestRequest(req)

	// 由于审计有性能开销
	// 开启认证
	// Metadata(label.Auth, label.Enable).
	// 开启审计
	// Metadata(label.Audit, label.Enable).

	if entry.AuthEnable && entry.AuditLog {
		// 从认证中间件后, 取消认证后的上下文
		obj := req.Attribute(token.TOKEN_ATTRIBUTE_NAME)
		if obj == nil {
			return
		}
		tk := obj.(*token.Token)

		event := &event.OperateEventData{
			UserName:     tk.Username,
			ServiceName:  application.App().AppName,
			ResourceType: entry.Resource,
			Action:       entry.Labels["action"],
			FeaturePath:  entry.Path,
			Request:      pretty.ToJSON(req.Request),
			Response:     "{}",
		}
		err := a.w.WriteMessages(context.Background(),
			kafka.Message{
				Value: []byte(pretty.ToJSON(event)),
			},
		)
		if err != nil {
			a.l.Error().Msgf("write audit log error, %s", err)
		}
	}
}
测试中间件

使用审计中间件 让cmdb服务接入审计中心

  1. 应用添加中间件
// 启动之前配置钩子
// app 在启动的时候 能不能加载一种中间件逻辑, 比如认证中间件
application.App().HTTP.RouterBuildConfig.BeforeLoad = func(h http.Handler) {
	// 断言 router
	r := h.(*restful.Container)
	// 添加中间件,接入到用户中心
	r.Filter(middleware.RestfulServerInterceptor())
	// 接入 审计中间
	r.Filter(middlewares.NewAuditor())
}
  1. 开启接口审计
// 开启审计
Metadata(label.Audit, label.Enable).
  1. 启动服务测试接口
  • mcenter
  • maudit
  • cmdb

请求cmdb的secrets 接口

GET /cmdb/api/v1/secret HTTP/1.1
Host: 127.0.0.1:8020
Authorization: Bearer mJRdh9W2DDNs9T7oDpz4OOC4

maudit就会收到应用的审计日志

2023-12-16T10:30:36+08:00 DEBUG  event/impl/consumer.go:17 > message at topic/partition/offset maudit.event/0/17:  = {
  "session": "",
  "account": "",
  "user_name": "admin",
  "user_type": "",
  "user_domain": "",
  "service_name": "cmdb",
  "feature_path": "/cmdb/api/v1/secret/",
  "resource_type": "secrets",
  "action": "list",
  "cost": 0,
  "request": "\u0026{Method:GET URL:/cmdb/api/v1/secret Proto:HTTP/1.1 ProtoMajor:1 ProtoMinor:1 Header:map[Accept:[*/*] Accept-Encoding:[gzip, deflate, 
br] Authorization:[Bearer mJRdh9W2DDNs9T7oDpz4OOC4] Cache-Control:[no-cache] Connection:[keep-alive] Content-Length:[81] Content-Type:[application/json] 
Postman-Token:[2923558d-9931-48db-ad41-6d364a828eee] User-Agent:[PostmanRuntime/7.35.0]] Body:0xc000614300 GetBody:\u003cnil\u003e ContentLength:81 TransferEncoding:[] Close:false Host:127.0.0.1:8020 Form:map[] PostForm:map[] MultipartForm:\u003cnil\u003e Trailer:map[] RemoteAddr:127.0.0.1:50972 RequestURI:/cmdb/api/v1/secret TLS:\u003cnil\u003e Cancel:\u003cnil\u003e Response:\u003cnil\u003e ctx:0xc0003c6460}",
  "response": "{}"
}

使用Exporter实现应用自定义监控

gorestful 框架开启 prom metrics接口
// 启动之前配置钩子
// app 在启动的时候 能不能加载一种中间件逻辑, 比如认证中间件
application.App().HTTP.RouterBuildConfig.BeforeLoad = func(h http.Handler) {
	// 断言 router
	r := h.(*restful.Container)
	// 添加中间件,接入到用户中心
	r.Filter(middleware.RestfulServerInterceptor())

	// 使用gorestful 框架注册一个之定义 handler
	ws := new(restful.WebService)
	ws.Route(ws.GET("/metrics").To(func(r *restful.Request, w *restful.Response) {
		// 基于标准库 包装了一层
		promhttp.Handler().ServeHTTP(w, r.Request)
	}))
	r.Add(ws)
}
补充自定义采集器

记录 kafka 消息的输入 入库报错指标(当前有多少事件由于报错没有入库)

package impl

import "github.com/prometheus/client_golang/prometheus"

func NewEventCollect() *EventCollect {
	return &EventCollect{
		errCountDesc: prometheus.NewDesc(
			"save_event_error_count",
			"事件入库失败个数统计",
			[]string{},
			prometheus.Labels{"service": "maudit"},
		),
	}
}

// 收集事件指标的采集器
type EventCollect struct {
	errCountDesc *prometheus.Desc
	// 需要自己根据实践情况来维护这个变量
	errCount int
}

func (c *EventCollect) Inc() {
	c.errCount++
}

// 指标元数据注册
func (c *EventCollect) Describe(ch chan<- *prometheus.Desc) {
	ch <- c.errCountDesc
}

// 指标的值的采集
func (c *EventCollect) Collect(ch chan<- prometheus.Metric) {
	ch <- prometheus.MustNewConstMetric(c.errCountDesc, prometheus.GaugeValue, float64(c.errCount))
}
初始化时注册到默认注册表
// 将采集器注册到默认注册表
i.colector = NewEventCollect()
prometheus.MustRegister(i.colector)
采集器统计指标数据
for {
	m, err := i.kr.FetchMessage(context.Background())
	if err != nil {
		i.log.Error().Msgf("consume event error, %s", err)
		break
	}
	i.log.Debug().Msgf("message at topic/partition/offset %v/%v/%v: %s = %s\n", m.Topic, m.Partition, m.Offset, string(m.Key), string(m.Value))

	err = i.kr.CommitMessages(context.Background(), m)
	if err != nil {
		i.log.Error().Msgf("commit error, %s", err)
	}

	// 存储事件 i.SaveEvent(), 模拟存储失败报错
	err = errors.New("save event error")
	if err != nil {
		i.colector.Inc()
	}
}

接口数据

# HELP save_event_error_count 事件入库失败个数统计
# TYPE save_event_error_count gauge
save_event_error_count{service="maudit"} 1

Documentation

The Go Gopher

There is no documentation for this package.

Directories

Path Synopsis

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL