backend

package
v1.0.2 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: May 12, 2020 License: MIT Imports: 24 Imported by: 0

Documentation

Index

Constants

View Source
const (
	VERSION = "1.0"
)
View Source
const (
	WRITE_QUEUE = 16
)

Variables

View Source
var (
	ErrClosed          = errors.New("write in a closed file")
	ErrBackendNotExist = errors.New("use a backend not exists")
	ErrQueryForbidden  = errors.New("query forbidden")
)
View Source
var (
	ForbidCmds   = "(?i:^\\s*grant|^\\s*revoke|\\(\\)\\$)"
	SupportCmds  = "(?i:from|drop\\s*measurement)"
	ExecutorCmds = "(?i:show\\s*measurements|show\\s*tag\\s*keys|show\\s*series|show\\s*field\\s*keys|show\\s*retention\\s*policies)"
	GlobalCmds   = "(?i:create database\\s)"
)
View Source
var (
	ErrBadRequest = errors.New("Bad Request\n")
	ErrNotFound   = errors.New("Not Found\n")
	ErrInternal   = errors.New("Internal Error")
	ErrUnknown    = errors.New("Unknown Error\n")
)
View Source
var (
	ErrWrongQuote     = errors.New("wrong quote")
	ErrUnmatchedQuote = errors.New("unmatched quote")
	ErrUnclosed       = errors.New("unclosed parenthesis")
	ErrIllegalQL      = errors.New("illegal InfluxQL")
)
View Source
var (
	ErrIllegalConfig = errors.New("illegal config")
)
View Source
var (
	ErrNotClusterQuery = errors.New("not a cluster query")
)

Functions

func BytesToInt64

func BytesToInt64(buf []byte) int64

func Compress

func Compress(buf *bytes.Buffer, p []byte) (err error)

func FindEndWithQuote

func FindEndWithQuote(data []byte, start int, endchar byte) (end int, unquoted []byte, err error)

func GetDBFromInfluxQL added in v1.0.1

func GetDBFromInfluxQL(q string) (m string, err error)

func GetJsonBodyfromSeries

func GetJsonBodyfromSeries(series []seri) (body []byte, err error)

GetJsonBodyfromSeries seri转化为byte

func GetMeasurementFromInfluxQL

func GetMeasurementFromInfluxQL(q string) (m string, err error)

func GetSeriesArray

func GetSeriesArray(sBody []byte) (ss []seri, err error)

GetSerisArray byte转化为seri

func GzipEncode

func GzipEncode(body []byte, need bool) (b []byte)

GzipEncode 把byte类型压缩

func Int64ToBytes

func Int64ToBytes(i int64) []byte

func ScanKey

func ScanKey(pointbuf []byte) (key string, err error)

func ScanToken

func ScanToken(data []byte, atEOF bool) (advance int, token []byte, err error)

func TrimRight

func TrimRight(p []byte, s []byte) (r []byte)

faster then bytes.TrimRight, not sure why.

Types

type BackendAPI

type BackendAPI interface {
	Querier
	IsActive() (b bool)
	IsWriteOnly() (b bool)
	Ping() (version string, err error)
	GetZone() (zone string)
	GetDB() (db string)
	Write(p []byte) (err error)
	Close() (err error)
	QueryResp(req *http.Request) (header http.Header, status int, body []byte, err error)
}

type BackendConfig

type BackendConfig struct {
	URL             string
	DB              string
	BasicAuth       *BasicAuth
	Zone            string
	Interval        int
	Timeout         int
	TimeoutQuery    int
	MaxRowLimit     int
	CheckInterval   int
	RewriteInterval int
	WriteOnly       int
}

type Backends

type Backends struct {
	*HttpBackend

	Interval        int
	RewriteInterval int
	MaxRowLimit     int32
	// contains filtered or unexported fields
}

func NewBackends

func NewBackends(cfg *BackendConfig, name string, storedir string) (bs *Backends, err error)

maybe ch_timer is not the best way. NewBackends 新建一个Backends对象

func (*Backends) Close

func (bs *Backends) Close() (err error)

Close 退出worker,关闭管道

func (*Backends) Flush

func (bs *Backends) Flush()

Flush 清空管道中的数据到备份文件中

func (*Backends) GetDB added in v1.0.1

func (bs *Backends) GetDB() (db string)

func (*Backends) Idle

func (bs *Backends) Idle()

Idle 数据写入influxdb

func (*Backends) Rewrite

func (bs *Backends) Rewrite() (err error)

func (*Backends) RewriteLoop

func (bs *Backends) RewriteLoop()

RewriteLoop

func (*Backends) Write

func (bs *Backends) Write(p []byte) (err error)

Write 把[]byte类型p发送到ch_write管道中

func (*Backends) WriteBuffer

func (bs *Backends) WriteBuffer(p []byte)

WriteBuffer 对象p写进bs.buffer

type BasicAuth

type BasicAuth struct {
	Username string
	Password string
}

type FileBackend

type FileBackend struct {
	// contains filtered or unexported fields
}

func NewFileBackend

func NewFileBackend(filename string, storedir string) (fb *FileBackend, err error)

func (*FileBackend) CleanUp

func (fb *FileBackend) CleanUp() (err error)

CleanUp

func (*FileBackend) Close

func (fb *FileBackend) Close()

func (*FileBackend) IsData

func (fb *FileBackend) IsData() (dataflag bool)

IsData 查看数据标识位dataflag

func (*FileBackend) Read

func (fb *FileBackend) Read() (p []byte, err error)

FIXME: signal here

func (*FileBackend) RollbackMeta

func (fb *FileBackend) RollbackMeta() (err error)

func (*FileBackend) UpdateMeta

func (fb *FileBackend) UpdateMeta() (err error)

func (*FileBackend) Write

func (fb *FileBackend) Write(p []byte) (err error)

Write 写到文件中

type FileConfigSource

type FileConfigSource struct {
	BACKENDS     map[string]BackendConfig
	KEYMAPS      map[string]map[string][]string
	NODES        map[string]NodeConfig
	DEFAULT_NODE NodeConfig
	// contains filtered or unexported fields
}

func NewFileConfigSource

func NewFileConfigSource(cfgfile string, node string) (fcs *FileConfigSource)

func (*FileConfigSource) LoadBackends

func (fcs *FileConfigSource) LoadBackends() (backends map[string]*BackendConfig, err error)

func (*FileConfigSource) LoadMeasurements

func (fcs *FileConfigSource) LoadMeasurements() (m_map map[string]map[string][]string, err error)

func (*FileConfigSource) LoadNode

func (fcs *FileConfigSource) LoadNode() (nodecfg NodeConfig, err error)

type HttpBackend

type HttpBackend struct {
	BasicAuth *BasicAuth

	Interval int
	URL      string
	DB       string
	Zone     string
	Active   bool

	WriteOnly int
	// contains filtered or unexported fields
}

func NewHttpBackend

func NewHttpBackend(cfg *BackendConfig) (hb *HttpBackend)

func (*HttpBackend) CheckActive

func (hb *HttpBackend) CheckActive()

func (*HttpBackend) Close

func (hb *HttpBackend) Close() (err error)

func (*HttpBackend) GetZone

func (hb *HttpBackend) GetZone() (zone string)

func (*HttpBackend) IsActive

func (hb *HttpBackend) IsActive() bool

func (*HttpBackend) IsWriteOnly

func (hb *HttpBackend) IsWriteOnly() bool

func (*HttpBackend) Ping

func (hb *HttpBackend) Ping() (version string, err error)

func (*HttpBackend) Query

func (hb *HttpBackend) Query(w http.ResponseWriter, req *http.Request) (err error)

Don't setup Accept-Encoding: gzip. Let real client do so. If real client don't support gzip and we setted, it will be a mistake.

func (*HttpBackend) QueryResp

func (hb *HttpBackend) QueryResp(req *http.Request) (header http.Header, status int, body []byte, err error)

func (*HttpBackend) Write

func (hb *HttpBackend) Write(p []byte) (err error)

func (*HttpBackend) WriteCompressed

func (hb *HttpBackend) WriteCompressed(p []byte) (err error)

func (*HttpBackend) WriteStream

func (hb *HttpBackend) WriteStream(stream io.Reader, compressed bool) (err error)

type InfluxCluster

type InfluxCluster struct {
	Zone string

	ForbiddenQuery []*regexp.Regexp
	ObligatedQuery []*regexp.Regexp

	WriteTracing int
	QueryTracing int
	// contains filtered or unexported fields
}

func NewInfluxCluster

func NewInfluxCluster(cfgsrc *FileConfigSource, nodecfg *NodeConfig, storedir string) (ic *InfluxCluster)

func (*InfluxCluster) AddNext

func (ic *InfluxCluster) AddNext(ba BackendAPI)

func (*InfluxCluster) CheckQuery

func (ic *InfluxCluster) CheckQuery(q string) (err error)

func (*InfluxCluster) Close

func (ic *InfluxCluster) Close() (err error)

func (*InfluxCluster) EnsureQuery

func (ic *InfluxCluster) EnsureQuery(s string) (err error)

func (*InfluxCluster) Flush

func (ic *InfluxCluster) Flush()

func (*InfluxCluster) ForbidQuery

func (ic *InfluxCluster) ForbidQuery(s string) (err error)

func (*InfluxCluster) GetBackends

func (ic *InfluxCluster) GetBackends(measurement, db string) (backends []BackendAPI, ok bool)

func (*InfluxCluster) GlobalQuery added in v1.0.1

func (ic *InfluxCluster) GlobalQuery(q string) bool

func (*InfluxCluster) LoadConfig

func (ic *InfluxCluster) LoadConfig() (err error)

func (*InfluxCluster) Ping

func (ic *InfluxCluster) Ping() (version string, err error)

func (*InfluxCluster) Query

func (ic *InfluxCluster) Query(w http.ResponseWriter, req *http.Request) (err error)

func (*InfluxCluster) QueryAll

func (ic *InfluxCluster) QueryAll(req *http.Request) (sHeader http.Header, bodys [][]byte, err error)

func (*InfluxCluster) ShowQuery

func (ic *InfluxCluster) ShowQuery(w http.ResponseWriter, req *http.Request) (err error)

func (*InfluxCluster) Write

func (ic *InfluxCluster) Write(p []byte, precision string, db string) (err error)

func (*InfluxCluster) WriteRow

func (ic *InfluxCluster) WriteRow(line []byte, precision string, db string)

Wrong in one row will not stop others. So don't try to return error, just print it.

func (*InfluxCluster) WriteStatistics

func (ic *InfluxCluster) WriteStatistics() (err error)

type InfluxQLExecutor

type InfluxQLExecutor struct {
}

func (*InfluxQLExecutor) Query

func (iqe *InfluxQLExecutor) Query(w http.ResponseWriter, req *http.Request) (err error)

type NodeConfig

type NodeConfig struct {
	ListenAddr   string
	Zone         string
	Nexts        string
	Interval     int
	IdleTimeout  int
	WriteTracing int
	QueryTracing int
}

type Querier

type Querier interface {
	Query(w http.ResponseWriter, req *http.Request) (err error)
}

type Statistics

type Statistics struct {
	QueryRequests        int64
	QueryRequestsFail    int64
	WriteRequests        int64
	WriteRequestsFail    int64
	PingRequests         int64
	PingRequestsFail     int64
	PointsWritten        int64
	PointsWrittenFail    int64
	WriteRequestDuration int64
	QueryRequestDuration int64
}

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL