backend

package
v1.4.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jun 13, 2020 License: Apache-2.0 Imports: 26 Imported by: 5

Documentation

Index

Constants

View Source
const (
	DefaultSendTaskSleepInterval = time.Millisecond * 50 //默认睡眠间隔为50ms
	MaxSendRetry                 = 10
)

send

View Source
const DefaultSendQueueMaxSize = 102400 //10.24w

Variables

View Source
var (
	Config BackendSection
	// 服务节点的一致性哈希环 pk -> node
	TsdbNodeRing *ConsistentHashRing

	// 发送缓存队列 node -> queue_of_data
	TsdbQueues    = make(map[string]*list.SafeListLimited)
	JudgeQueues   = cache.SafeJudgeQueue{}
	InfluxdbQueue *list.SafeListLimited
	OpenTsdbQueue *list.SafeListLimited

	// 连接池 node_address -> connection_pool
	TsdbConnPools          *pools.ConnPools
	JudgeConnPools         *pools.ConnPools
	OpenTsdbConnPoolHelper *pools.OpenTsdbConnPoolHelper
)
View Source
var (
	MinStep int //最小上报周期,单位sec
)

Functions

func FetchData

func FetchData(inputs []dataobj.QueryData) []*dataobj.TsdbQueryResponse

func FetchDataForUI

func FetchDataForUI(input dataobj.QueryDataForUI) []*dataobj.TsdbQueryResponse

func GenQParam

func GenQParam(start, end int64, consolFunc, endpoint, counter string, step int) dataobj.TsdbQueryParam

func GetCounter

func GetCounter(metric, tag string, tagMap map[string]string) (counter string, err error)

func GetJudges

func GetJudges() []string

func GetSeries

func GetSeries(start, end int64, req []SeriesReq) ([]dataobj.QueryData, error)

func Init

func Init(cfg BackendSection)

func Push2InfluxdbSendQueue added in v1.4.0

func Push2InfluxdbSendQueue(items []*dataobj.MetricValue)

将原始数据插入到influxdb缓存队列

func Push2JudgeSendQueue

func Push2JudgeSendQueue(items []*dataobj.MetricValue)

func Push2OpenTsdbSendQueue added in v1.4.0

func Push2OpenTsdbSendQueue(items []*dataobj.MetricValue)

将原始数据入到tsdb发送缓存队列

func Push2TsdbSendQueue

func Push2TsdbSendQueue(items []*dataobj.MetricValue)

Push2TsdbSendQueue pushes data to a TSDB instance which depends on the consistent ring.

func QueryOne

func QueryOne(para dataobj.TsdbQueryParam) (resp *dataobj.TsdbQueryResponse, err error)

func RebuildConsistentHashRing

func RebuildConsistentHashRing(hashRing *ConsistentHashRing, nodes []string, replicas int)

func Send2JudgeTask

func Send2JudgeTask(Q *list.SafeListLimited, addr string, concurrent int)

func Send2TsdbTask

func Send2TsdbTask(Q *list.SafeListLimited, node, addr string, concurrent int)

func TagMatch

func TagMatch(straTags []model.Tag, tag map[string]string) bool

Types

type BackendSection

type BackendSection struct {
	Enabled      bool   `yaml:"enabled"`
	Batch        int    `yaml:"batch"`
	ConnTimeout  int    `yaml:"connTimeout"`
	CallTimeout  int    `yaml:"callTimeout"`
	WorkerNum    int    `yaml:"workerNum"`
	MaxConns     int    `yaml:"maxConns"`
	MaxIdle      int    `yaml:"maxIdle"`
	IndexTimeout int    `yaml:"indexTimeout"`
	StraPath     string `yaml:"straPath"`
	HbsMod       string `yaml:"hbsMod"`

	Replicas    int                     `yaml:"replicas"`
	Cluster     map[string]string       `yaml:"cluster"`
	ClusterList map[string]*ClusterNode `json:"clusterList"`
	Influxdb    InfluxdbSection         `yaml:"influxdb"`
	OpenTsdb    OpenTsdbSection         `yaml:"opentsdb"`
}

type ClusterNode

type ClusterNode struct {
	Addrs []string `json:"addrs"`
}

type ConsistentHashRing

type ConsistentHashRing struct {
	sync.RWMutex
	// contains filtered or unexported fields
}

func NewConsistentHashRing

func NewConsistentHashRing(replicas int32, nodes []string) *ConsistentHashRing

func (*ConsistentHashRing) GetNode

func (c *ConsistentHashRing) GetNode(pk string) (string, error)

func (*ConsistentHashRing) GetRing

func (*ConsistentHashRing) Set

type InfluxClient added in v1.4.0

type InfluxClient struct {
	Client    client.Client
	Database  string
	Precision string
}

func NewInfluxdbClient added in v1.4.0

func NewInfluxdbClient() (*InfluxClient, error)

func (*InfluxClient) Send added in v1.4.0

func (c *InfluxClient) Send(items []*dataobj.InfluxdbItem) error

type InfluxdbSection added in v1.4.0

type InfluxdbSection struct {
	Enabled   bool   `yaml:"enabled"`
	Batch     int    `yaml:"batch"`
	MaxRetry  int    `yaml:"maxRetry"`
	WorkerNum int    `yaml:"workerNum"`
	Timeout   int    `yaml:"timeout"`
	Address   string `yaml:"address"`
	Database  string `yaml:"database"`
	Username  string `yaml:"username"`
	Password  string `yaml:"password"`
	Precision string `yaml:"precision"`
}

type OpenTsdbSection added in v1.4.0

type OpenTsdbSection struct {
	Enabled     bool   `yaml:"enabled"`
	Batch       int    `yaml:"batch"`
	ConnTimeout int    `yaml:"connTimeout"`
	CallTimeout int    `yaml:"callTimeout"`
	WorkerNum   int    `yaml:"workerNum"`
	MaxConns    int    `yaml:"maxConns"`
	MaxIdle     int    `yaml:"maxIdle"`
	MaxRetry    int    `yaml:"maxRetry"`
	Address     string `yaml:"address"`
}

type Pool

type Pool struct {
	Pool *pool.ConnPool
	Addr string
}

func SelectPoolByPK

func SelectPoolByPK(pk string) ([]Pool, error)

type Series

type Series struct {
	Endpoints []string `json:"endpoints"`
	Metric    string   `json:"metric"`
	Tags      []string `json:"tags"`
	Step      int      `json:"step"`
	DsType    string   `json:"dstype"`
}

type SeriesReq

type SeriesReq struct {
	Endpoints []string `json:"endpoints"`
	Metric    string   `json:"metric"`
	Tagkv     []*Tagkv `json:"tagkv"`
}

type SeriesResp

type SeriesResp struct {
	Dat []Series `json:"dat"`
	Err string   `json:"err"`
}

type Tagkv

type Tagkv struct {
	TagK string   `json:"tagk"`
	TagV []string `json:"tagv"`
}

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL