prometheusmetrics

package
v1.3.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Oct 2, 2025 License: MIT Imports: 9 Imported by: 0

README

Prometheus 指标(开箱即用示例)

本示例提供一套与 BatchSQL 对齐的 Prometheus 指标与 Reporter,支持常用可配置项,做到“拿来即用、按需裁剪”。

  • 指标实现:examples/metrics/prometheus/prometheus_metrics.go
  • Reporter 实现:examples/metrics/prometheus/prometheus_reporter.go
  • 对应仪表板:test/integration/grafana/provisioning/dashboards/batchsql-performance.json(单一 Dashboard)

功能与特性

  • 含重试相关指标:error_type 使用 "retry:"/"final:" 前缀,便于面板聚合
  • 直方图:入队延迟、攒批耗时、执行耗时(覆盖重试/退避)、批大小
  • 仪表:并发度、队列长度、在途批次
  • 可配置项:命名空间/子系统、常量标签、是否启用 test_name/table 维度、Buckets

快速开始

import (
    "context"
    "log"
    bsql "github.com/rushairer/batchsql"
    pm "github.com/rushairer/batchsql/examples/metrics/prometheus"
)

func main() {
    // 1) 创建指标
    m := pm.NewMetrics(pm.Options{
        Namespace:      "batchsql",
        IncludeTestName: true, // 可选
        IncludeTable:    false,
        ConstLabels:     map[string]string{"env":"dev"},
    })

    // 2) 启动 /metrics
    if err := m.StartServer(2112); err != nil {
        log.Fatal(err)
    }
    defer m.StopServer(context.Background())

    // 3) 创建 Reporter 并绑定到 BatchSQL/Executor
    reporter := pm.NewReporter(m, "mysql", "batch_insert")

    exec := bsql.NewSQLThrottledBatchExecutorWithDriver(nil, bsql.DefaultMySQLDriver).
        WithMetricsReporter(reporter) // executor 指标

    batch := bsql.NewBatchSQL(exec) // BatchSQL 内部会使用 reporter 进行观测

    _ = batch // 按需提交任务...
}

配置说明(Options)

  • Namespace/Subsystem:Prometheus 命名空间/子系统
  • ConstLabels:追加到所有指标的常量标签(如 env/region/tenant)
  • IncludeTestName:是否启用 test_name 维度(集成/压测推荐开启)
  • IncludeTable:是否启用 table 维度(注意基数)
  • 各直方图 Buckets:Enqueue/Assemble/Execute/BatchSize

与仪表板的配合

  • 已提供单一 Dashboard:test/integration/grafana/provisioning/dashboards/batchsql-performance.json
  • 面板依赖的关键指标与标签:
    • errors_total{error_type=~"retry:.|final:."}
    • enqueue_latency_seconds, batch_assemble_duration_seconds, execute_duration_seconds, batch_size
    • executor_concurrency, pipeline_queue_length, inflight_batches
    • 维度:database(必)、test_name(若开启)

异常与性能

  • 所有 Reporter 方法在 m=nil 时直接返回,避免空指针
  • 指标写入为加锁/原子开销,谨慎开启高基数标签(如 table)
  • 直方图 Buckets 越多、写入越频繁,资源消耗越大;生产中按 P95/P99 目标合理配置

Documentation

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

This section is empty.

Types

type Metrics

type Metrics struct {
	// contains filtered or unexported fields
}

Metrics 指标容器

func NewMetrics

func NewMetrics(opts Options) *Metrics

NewMetrics 创建并注册一套与 BatchSQL 对齐的指标

func (*Metrics) Handler

func (m *Metrics) Handler() http.Handler

Handler 返回 /metrics 的 http.Handler

func (*Metrics) StartServer

func (m *Metrics) StartServer(port int) error

StartServer 启动一个简易 HTTP 服务(/metrics)

func (*Metrics) StopServer

func (m *Metrics) StopServer(ctx context.Context) error

StopServer 停止 HTTP 服务

type Options

type Options struct {
	// 指标命名
	Namespace   string            // 如 "batchsql"
	Subsystem   string            // 可为空
	ConstLabels map[string]string // 追加到所有指标的常量标签,如 {"env":"prod","region":"cn"}

	// 标签维度开关(保持开箱可用的最小集)
	IncludeTestName bool // 是否启用 test_name 维度(适合集成/压测)
	IncludeTable    bool // 是否启用 table 维度(注意基数膨胀)

	// 直方图桶
	EnqueueBuckets   []float64
	AssembleBuckets  []float64
	ExecuteBuckets   []float64
	BatchSizeBuckets []float64
}

Options 配置项(可选)

type Reporter

type Reporter struct {

	// 绑定维度
	Database string
	TestName string
	// contains filtered or unexported fields
}

Reporter 实现 batchsql.MetricsReporter,写入 Prometheus 指标

func NewReporter

func NewReporter(m *Metrics, database, testName string) *Reporter

NewReporter 创建 Reporter;database 为必须,testName 可按需要传入

func (*Reporter) DecInflight

func (r *Reporter) DecInflight()

DecInflight 在途-1

func (*Reporter) IncError

func (r *Reporter) IncError(_ string, reason string)

IncError 错误计数(支持 retry:/final: 前缀)

func (*Reporter) IncInflight

func (r *Reporter) IncInflight()

IncInflight 在途+1

func (*Reporter) ObserveBatchAssemble

func (r *Reporter) ObserveBatchAssemble(d time.Duration)

ObserveBatchAssemble 攒批耗时

func (*Reporter) ObserveBatchSize

func (r *Reporter) ObserveBatchSize(n int)

ObserveBatchSize 单独记录批大小

func (*Reporter) ObserveEnqueueLatency

func (r *Reporter) ObserveEnqueueLatency(d time.Duration)

ObserveEnqueueLatency 提交到入队延迟

func (*Reporter) ObserveExecuteDuration

func (r *Reporter) ObserveExecuteDuration(table string, n int, d time.Duration, status string)

ObserveExecuteDuration 执行耗时(含重试与退避)

func (*Reporter) SetConcurrency

func (r *Reporter) SetConcurrency(n int)

SetConcurrency 执行并发度

func (*Reporter) SetQueueLength

func (r *Reporter) SetQueueLength(n int)

SetQueueLength 队列长度

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL