influxdb

package module
v1.0.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Apr 10, 2026 License: MIT Imports: 23 Imported by: 0

README

InfluxDB

Docker部署

pull image

docker pull bitnami/influxdb:latest
2.x
docker run -itd \
    --name influxdb2-server \
    -p 8086:8086 \
    -e INFLUXDB_HTTP_AUTH_ENABLED=true \
    -e INFLUXDB_ADMIN_USER=admin \
    -e INFLUXDB_ADMIN_USER_PASSWORD=123456789 \
    -e INFLUXDB_ADMIN_USER_TOKEN=admintoken123 \
    -e INFLUXDB_DB=my_database \
    bitnami/influxdb:2.7.11

create admin user sql script:

create user "admin" with password '123456789' with all privileges

管理后台: http://localhost:8086/

3.x
docker run -itd \
    --name influxdb3-server \
    -p 8181:8181 \
    -e INFLUXDB_NODE_ID=0 \
    -e INFLUXDB_HTTP_PORT_NUMBER=8181 \
    -e INFLUXDB_HTTP_AUTH_ENABLED=true \
    -e INFLUXDB_CREATE_ADMIN_TOKEN=yes \
    -e INFLUXDB_DB=my_database \
    bitnami/influxdb:latest

docker run -itd \
  --name influxdb3-explorer \
  -p 8888:80 \
  -p 8889:8888 \
  quay.io/influxdb/influxdb3-explorer:latest \
  --mode=admin

这个版本分离出来一个管理后台 InfluxDB Explorer:http://localhost:8888/

在管理后台填写:http://host.docker.internal:8181

Documentation

Index

Constants

This section is empty.

Variables

View Source
var (
	ErrInfluxDBClientNotInitialized = errors.InternalServer("INFLUXDB_CLIENT_NOT_INITIALIZED", "client not initialized")

	ErrInfluxDBConnectFailed = errors.InternalServer("INFLUXDB_CONNECT_FAILED", "connect failed")

	ErrInfluxDBCreateDatabaseFailed = errors.InternalServer("INFLUXDB_CREATE_DATABASE_FAILED", "database create failed")

	ErrInfluxDBQueryFailed = errors.InternalServer("INFLUXDB_QUERY_FAILED", "query failed")

	ErrClientNotConnected = errors.InternalServer("INFLUXDB_CLIENT_NOT_CONNECTED", "client not connected")

	ErrInvalidPoint = errors.InternalServer("INFLUXDB_INVALID_POINT", "invalid point")

	ErrNoPointsToInsert = errors.InternalServer("INFLUXDB_NO_POINTS_TO_INSERT", "no points to insert")

	ErrEmptyData = errors.InternalServer("INFLUXDB_EMPTY_DATA", "empty data")

	ErrBatchInsertFailed = errors.InternalServer("INFLUXDB_BATCH_INSERT_FAILED", "batch insert failed")

	ErrInsertFailed = errors.InternalServer("INFLUXDB_INSERT_FAILED", "insert failed")
)

Functions

func BatchInsert

func BatchInsert[T any](ctx context.Context, c *Client, data []*T, mapper Mapper[T]) error

BatchInsert 批量插入数据

func BoolToString

func BoolToString(value *bool) string

func BuildQuery

func BuildQuery(
	table string,
	filters map[string]interface{},
	operators map[string]string,
	fields []string,
) (string, []interface{})

func BuildQueryWithParams

func BuildQueryWithParams(
	table string,
	filters map[string]interface{},
	operators map[string]string,
	fields []string,
) string

func ConvertAnyToPointsSafe

func ConvertAnyToPointsSafe(pts []any) ([]*influxdb3.Point, error)

ConvertAnyToPointsSafe 将 []any 逐元素断言为 []*influxdb3.Point

func GetBoolPointTag

func GetBoolPointTag(point *influxdb3.Point, name string) *bool

func GetEnumPointTag

func GetEnumPointTag[T ~int32](point *influxdb3.Point, name string, valueMap map[string]int32) *T

func GetPointTag

func GetPointTag(point *influxdb3.Point, name string) *string

func GetTimestampField

func GetTimestampField(point *influxdb3.Point, name string) *timestamppb.Timestamp

func GetUint32Field

func GetUint32Field(point *influxdb3.Point, name string) *uint32

func GetUint32PointTag

func GetUint32PointTag(point *influxdb3.Point, name string) *uint32

func GetUint64PointTag

func GetUint64PointTag(point *influxdb3.Point, name string) *uint64

func Insert

func Insert[T any](ctx context.Context, c *Client, data *T, mapper Mapper[T]) error

Insert 插入数据

func ProtoMessageToPoint

func ProtoMessageToPoint(msg proto.Message, overrides map[string]string) (*influxdb3.Point, error)

ProtoMessageToPoint 将 protobuf message 转为 influxdb3.Point。 参数 overrides 可选:map[key]=role,key 为 protobuf 字段的 JSON 名称(例如 "deviceId"),role 为 "measurement"/"tag"/"field"/"time"。 约定识别规则(在无 overrides 时):

  • 字段名为 "measurement" -> measurement
  • 名称以 "_tag" 或以 "tag_" 前缀 -> tag
  • 字段类型为 google.protobuf.Timestamp 或名为 "time"/"timestamp" -> time
  • 其它标量 -> field

example.proto:

syntax = "proto3"; import "google/protobuf/timestamp.proto";

message SensorProto {
		string measurement = 1;           // 当作为 measurement 使用
		string device_id = 2;             // 作为 tag
		string location = 3;              // 作为 tag
		double temperature = 4;           // field
		int64 battery = 5;                // field
		google.protobuf.Timestamp ts = 6; // time
}

Go 使用示例(将 pb 替换为实际生成包路径,例如 `github.com/you/project/pb`)

package influxdb_test

import (

"fmt"
"time"

"github.com/InfluxCommunity/influxdb3-go/v2/influxdb3"
"google.golang.org/protobuf/types/known/timestamppb"

// 替换为你的 protobuf 生成包
pb "path/to/your/generated/pb"

// 假设 ProtoMessageToPoint 位于本模块的 influxdb 包
"your/module/path/influxdb"

)

func ExampleProtoMessageToPoint_basic() {
	// 构造 protobuf message
	msg := &pb.SensorProto{
		Measurement: "sensors",
		DeviceId:    "dev-1",
		Location:    "room1",
		Temperature: 23.5,
		Battery:     95,
		Ts:          timestamppb.New(time.Now().UTC()),
	}

	// 不传 overrides:按约定自动分类(字段名 measurement -> measurement,ts -> time,带 tag hint 的字段 -> tag)
	pt, err := influxdb.ProtoMessageToPoint(msg, nil)
	if err != nil {
		fmt.Println("err:", err)
		return
	}
	// 打印示例:measurement/tags/fields/time
	fmt.Println("measurement:", pt.GetMeasurement())
	fmt.Println("tag device_id:", pt.GetTag("device_id"))
	fmt.Println("field temperature:", pt.GetField("temperature"))
}

func ExampleProtoMessageToPoint_withOverrides() {
	msg := &pb.SensorProto{
		// 假设 proto 中没有 measurement 字段或你希望使用其它字段作为 measurement
		DeviceId:    "dev-1",
		Location:    "room1",
		Temperature: 23.5,
		Battery:     95,
		Ts:          timestamppb.New(time.Now().UTC()),
	}

	// overrides 的 key 使用 protobuf 字段的 JSON 名称(例如 proto 字段 device_id 的 json 名称通常为 "deviceId")
	overrides := map[string]string{
		"deviceId":    "measurement", // 把 deviceId 当作 measurement
		"location":    "tag",         // 强制 location 为 tag
		"battery":     "field",
		"ts":          "time",
		"temperature": "field",
	}

	pt, err := influxdb.ProtoMessageToPoint(msg, overrides)
	if err != nil {
		fmt.Println("err:", err)
		return
	}
	fmt.Println("measurement:", pt.GetMeasurement())
	fmt.Println("tags:", pt.GetTags())
	fmt.Println("fields:", pt.GetFields())
}

func Query

func Query[T any](ctx context.Context, c *Client, query string, mapper Mapper[T]) ([]*T, error)

Query 查询数据

func StructToPoint

func StructToPoint(s interface{}) (*influxdb3.Point, error)

StructToPoint 通用转换函数:将带 influx tag 的 struct 转为 influxdb3.Point 参数:s 带 tag 的 struct 实例(不能是指针)

func Uint64ToString

func Uint64ToString(value *uint64) string

Types

type Client

type Client struct {
	// contains filtered or unexported fields
}

func NewClient

func NewClient(opts ...Option) (*Client, error)

func (*Client) BatchInsert

func (c *Client) BatchInsert(ctx context.Context, points []*influxdb3.Point) error

BatchInsert 批量插入数据

func (*Client) Close

func (c *Client) Close()

Close 关闭InfluxDB客户端

func (*Client) Count

func (c *Client) Count(ctx context.Context, query string) (int64, error)

Count 执行计数查询并返回解析到的数量

func (*Client) ExecInfluxQLQuery

func (c *Client) ExecInfluxQLQuery(ctx context.Context, query string, opts ...influxdb3.QueryOption) (*influxdb3.QueryIterator, error)

ExecInfluxQLQuery 执行 Flux/InfluxQL 查询并返回原始迭代器

func (*Client) ExecSQLQuery

func (c *Client) ExecSQLQuery(ctx context.Context, query string, opts ...influxdb3.QueryOption) (*influxdb3.QueryIterator, error)

ExecSQLQuery 执行 SQL 查询并返回原始迭代器

func (*Client) Exist

func (c *Client) Exist(ctx context.Context, query string) (bool, error)

Exist 执行查询并判断是否存在记录(有任意一条记录即为存在)

func (*Client) Insert

func (c *Client) Insert(ctx context.Context, point *influxdb3.Point) error

Insert 插入数据

func (*Client) Query

func (c *Client) Query(ctx context.Context, query string) (*influxdb3.QueryIterator, error)

Query 查询数据

func (*Client) QueryWithParams

func (c *Client) QueryWithParams(
	ctx context.Context,
	table string,
	filters map[string]interface{},
	operators map[string]string,
	fields []string,
) (*influxdb3.QueryIterator, error)

QueryWithParams 使用参数化方式查询数据

func (*Client) ServerVersion

func (c *Client) ServerVersion() string

ServerVersion 获取InfluxDB服务器版本

func (*Client) WritePoints

func (c *Client) WritePoints(ctx context.Context, pts []any) error

WritePoints 将通用点集合写入 InfluxDB;仅支持传入的元素为 *influxdb3.Point

func (*Client) WritePointsStrict

func (c *Client) WritePointsStrict(ctx context.Context, points []*influxdb3.Point) error

WritePointsStrict 接受严格类型 []*influxdb3.Point 并写入

type Mapper

type Mapper[T any] interface {
	// ToPoint 将数据转换为InfluxDB的Point格式
	ToPoint(data *T) *influxdb3.Point

	// ToData 将InfluxDB的Point转换为原始数据
	ToData(point *influxdb3.Point) *T
}

Mapper 数据转换的接口

type Option

type Option func(o *Client)

func WithAuthScheme

func WithAuthScheme(authScheme string) Option

func WithDatabase

func WithDatabase(database string) Option

func WithHost

func WithHost(host string) Option

func WithIdleConnectionTimeout

func WithIdleConnectionTimeout(idleTimeout time.Duration) Option

func WithLogger

func WithLogger(logger log.Logger) Option

func WithMaxIdleConnections

func WithMaxIdleConnections(maxIdle int) Option

func WithOptions

func WithOptions(opts *influxdb3.ClientConfig) Option

func WithOrganization

func WithOrganization(organization string) Option

func WithQueryTimeout

func WithQueryTimeout(timeout time.Duration) Option

func WithTLSConfig

func WithTLSConfig(tlsConfig *tls.Config) Option

func WithToken

func WithToken(token string) Option

func WithWriteTimeout

func WithWriteTimeout(timeout time.Duration) Option

type Repository

type Repository[DTO any, ENTITY any] struct {
	// contains filtered or unexported fields
}

Repository MongoDB 版仓库(泛型)

func NewRepository

func NewRepository[DTO any, ENTITY any](client *Client, collection string, logger *log.Helper) *Repository[DTO, ENTITY]

func (*Repository[DTO, ENTITY]) BatchCreate

func (r *Repository[DTO, ENTITY]) BatchCreate(_ context.Context, dtos []*DTO) ([]*DTO, error)

BatchCreate 批量插入

func (*Repository[DTO, ENTITY]) Count

func (r *Repository[DTO, ENTITY]) Count(ctx context.Context, baseWhere string, whereArgs ...any) (int64, error)

Count 按给定 builder 中的 filter 统计数量

func (*Repository[DTO, ENTITY]) Create

func (r *Repository[DTO, ENTITY]) Create(_ context.Context, dto *DTO) (*DTO, error)

Create 插入一条记录

func (*Repository[DTO, ENTITY]) Exists

func (r *Repository[DTO, ENTITY]) Exists(ctx context.Context, baseWhere string, whereArgs ...any) (bool, error)

Exists 检查是否存在符合条件的记录

func (*Repository[DTO, ENTITY]) ListWithPagination

func (r *Repository[DTO, ENTITY]) ListWithPagination(ctx context.Context, req *paginationV1.PaginationRequest) ([]*DTO, int64, error)

ListWithPagination 针对 paginationV1.PaginationRequest 的列表查询

func (*Repository[DTO, ENTITY]) ListWithPaging

func (r *Repository[DTO, ENTITY]) ListWithPaging(ctx context.Context, req *paginationV1.PagingRequest) ([]*DTO, int64, error)

ListWithPaging 针对 paginationV1.PagingRequest 的列表查询(兼容 Query/OrQuery/FilterExpr)

Directories

Path Synopsis

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL