backend

package
v1.4.9 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jun 16, 2021 License: MIT Imports: 27 Imported by: 1

Documentation

Index

Constants

This section is empty.

Variables

View Source
var (
	ErrEmptyBackends       = errors.New("backends cannot be empty")
	ErrEmptyKeyMaps        = errors.New("keymaps cannot be empty")
	ErrBackendNotExist     = errors.New("backend not exists")
	ErrMethodNotAllowed    = errors.New("method not allowed")
	ErrEmptyQuery          = errors.New("empty query")
	ErrOnClauseForbidden   = errors.New("on clause forbidden")
	ErrDatabaseNotFound    = errors.New("database not found")
	ErrDatabaseForbidden   = errors.New("database forbidden")
	ErrQueryError          = errors.New("query error")
	ErrGetMeasurement      = errors.New("can't get measurement")
	ErrUnknownMeasurement  = errors.New("unknown measurement")
	ErrBackendsUnavailable = errors.New("backends unavailable")
)
View Source
var (
	Version   = "not build"
	GitCommit = "not build"
	BuildTime = "not build"
)
View Source
var (
	ErrBadRequest   = errors.New("bad request")
	ErrUnauthorized = errors.New("unauthorized")
	ErrNotFound     = errors.New("not found")
	ErrInternal     = errors.New("internal error")
	ErrUnknown      = errors.New("unknown error")
)
View Source
var (
	ErrWrongQuote     = errors.New("wrong quote")
	ErrUnmatchedQuote = errors.New("unmatched quote")
	ErrUnclosed       = errors.New("unclosed parenthesis")
	ErrIllegalQL      = errors.New("illegal InfluxQL")
)
View Source
var StatisticsMeasurementName = "influx.proxy.statistics"
View Source
var SupportCmds = mapset.NewSet(
	"show measurements",
	"show series",
	"show field keys",
	"show tag keys",
	"show tag values",
	"show retention policies",
	"show stats",
	"show databases",
	"create database",
	"delete from",
	"drop series from",
	"drop measurement",
)

Functions

func AppendNano added in v1.4.5

func AppendNano(line []byte, precision string) []byte

func BytesToInt64 added in v1.1.0

func BytesToInt64(buf []byte) int64

func CheckDatabaseFromTokens added in v1.4.3

func CheckDatabaseFromTokens(tokens []string) (check bool, show bool, db string)

func CheckDeleteOrDropMeasurementFromTokens added in v1.4.2

func CheckDeleteOrDropMeasurementFromTokens(tokens []string) (check bool)

func CheckQuery added in v1.4.2

func CheckQuery(q string) (tokens []string, check bool, from bool)

func CheckSelectOrShowFromTokens added in v1.4.2

func CheckSelectOrShowFromTokens(tokens []string) (check bool)

func CloneQueryRequest added in v1.4.7

func CloneQueryRequest(r *http.Request) *http.Request

func Compress

func Compress(buf *bytes.Buffer, p []byte) (err error)

func CopyHeader added in v1.4.6

func CopyHeader(dst, src http.Header)

func FindEndWithQuote

func FindEndWithQuote(data []byte, start int, endchar byte) (end int, unquoted []byte, err error)

func FindLastIndexWithIdent added in v1.4.8

func FindLastIndexWithIdent(m string) (i int)

func GetDatabaseFromInfluxQL added in v1.4.2

func GetDatabaseFromInfluxQL(q string) (m string, err error)

func GetDatabaseFromTokens added in v1.4.2

func GetDatabaseFromTokens(tokens []string) (m string, err error)

func GetHeadStmtFromTokens added in v1.4.2

func GetHeadStmtFromTokens(tokens []string, n int) (stmt string)

func GetIdentifierFromTokens added in v1.4.2

func GetIdentifierFromTokens(tokens []string, keywords []string, fn func([]string) string) (m string, err error)

func GetMeasurementFromInfluxQL

func GetMeasurementFromInfluxQL(q string) (m string, err error)

func GetMeasurementFromTokens added in v1.4.2

func GetMeasurementFromTokens(tokens []string) (m string, err error)

func GzipCompress added in v1.1.0

func GzipCompress(b []byte) (cb []byte, err error)

func HeadStmtInSupportCmds added in v1.4.6

func HeadStmtInSupportCmds(tokens []string, n int) (in bool)

func Int64ToBytes added in v1.1.0

func Int64ToBytes(n int64) []byte

func NewTransport added in v1.4.4

func NewTransport(tlsSkip bool) (transport *http.Transport)

func QueryInParallel added in v1.4.7

func QueryInParallel(backends []BackendAPI, req *http.Request, fn func(BackendAPI, *http.Request)) (bodies [][]byte, header http.Header, inactive int, err error)

func RapidCheck added in v1.4.5

func RapidCheck(buf []byte) bool

func ScanKey

func ScanKey(pointbuf []byte) (key string, err error)

func ScanTime added in v1.2.1

func ScanTime(buf []byte) (int, bool)

func ScanToken

func ScanToken(data []byte, atEOF bool) (advance int, token []byte, err error)

func ScanTokens added in v1.4.2

func ScanTokens(q string, n int) (tokens []string)

func SeriesFromResponseBytes added in v1.3.1

func SeriesFromResponseBytes(b []byte) (series models.Rows, e error)

func Write added in v1.4.5

func Write(w http.ResponseWriter, body []byte, gzip bool)

func WriteResp added in v1.4.5

func WriteResp(w http.ResponseWriter, req *http.Request, rsp *Response, header http.Header)

Types

type BackendAPI

type BackendAPI interface {
	Querier
	IsActive() (b bool)
	IsRewriting() (b bool)
	IsWriteOnly() (b bool)
	Ping() (version string, err error)
	Write(p []byte) (err error)
	Close() (err error)
	QuerySink(req *http.Request) (qr *QueryResult)
}

type BackendConfig

type BackendConfig struct {
	URL             string `mapstructure:"url"`
	DB              string `mapstructure:"db"`
	Username        string `mapstructure:"username"`
	Password        string `mapstructure:"password"`
	FlushSize       int    `mapstructure:"flush_size"`
	FlushTime       int    `mapstructure:"flush_time"`
	Timeout         int    `mapstructure:"timeout"`
	CheckInterval   int    `mapstructure:"check_interval"`
	RewriteInterval int    `mapstructure:"rewrite_interval"`
	ConnPoolSize    int    `mapstructure:"conn_pool_size"`
	WriteOnly       bool   `mapstructure:"write_only"`
}

type Backends

type Backends struct {
	*HttpBackend
	// contains filtered or unexported fields
}

func NewBackends

func NewBackends(cfg *BackendConfig, name string, datadir string) (bs *Backends, err error)

maybe ch_timer is not the best way.

func (*Backends) Close

func (bs *Backends) Close() (err error)

func (*Backends) Flush

func (bs *Backends) Flush()

func (*Backends) Idle

func (bs *Backends) Idle()

func (*Backends) Rewrite

func (bs *Backends) Rewrite() (err error)

func (*Backends) RewriteLoop

func (bs *Backends) RewriteLoop()

func (*Backends) Write

func (bs *Backends) Write(p []byte) (err error)

func (*Backends) WriteBuffer

func (bs *Backends) WriteBuffer(p []byte)

type FileBackend

type FileBackend struct {
	// contains filtered or unexported fields
}

func NewFileBackend

func NewFileBackend(filename string, datadir string) (fb *FileBackend, err error)

func (*FileBackend) CleanUp

func (fb *FileBackend) CleanUp() (err error)

func (*FileBackend) Close

func (fb *FileBackend) Close()

func (*FileBackend) IsData

func (fb *FileBackend) IsData() (dataflag bool)

func (*FileBackend) Read

func (fb *FileBackend) Read() (p []byte, err error)

FIXME: signal here

func (*FileBackend) RollbackMeta

func (fb *FileBackend) RollbackMeta() (err error)

func (*FileBackend) UpdateMeta

func (fb *FileBackend) UpdateMeta() (err error)

func (*FileBackend) Write

func (fb *FileBackend) Write(p []byte) (err error)

type FileConfigSource

type FileConfigSource struct {
	BACKENDS map[string]BackendConfig `mapstructure:"BACKENDS"`
	KEYMAPS  map[string][]string      `mapstructure:"KEYMAPS"`
	NODE     NodeConfig               `mapstructure:"NODE"`
}

func NewFileConfigSource

func NewFileConfigSource(cfgfile string) (fcs *FileConfigSource, err error)

func (*FileConfigSource) LoadBackends

func (fcs *FileConfigSource) LoadBackends() (backends map[string]*BackendConfig, err error)

func (*FileConfigSource) LoadMeasurements

func (fcs *FileConfigSource) LoadMeasurements() (m_map map[string][]string, err error)

func (*FileConfigSource) LoadNode

func (fcs *FileConfigSource) LoadNode() (nodecfg NodeConfig)

type HttpBackend

type HttpBackend struct {
	URL      string
	DB       string
	Username string
	Password string
	// contains filtered or unexported fields
}

func NewHttpBackend

func NewHttpBackend(cfg *BackendConfig) (hb *HttpBackend)

TODO: query timeout? use req.Cancel

func (*HttpBackend) CheckActive

func (hb *HttpBackend) CheckActive()

TODO: update active when calling successed or failed.

func (*HttpBackend) Close

func (hb *HttpBackend) Close() (err error)

func (*HttpBackend) IsActive

func (hb *HttpBackend) IsActive() (b bool)

func (*HttpBackend) IsRewriting added in v1.4.8

func (hb *HttpBackend) IsRewriting() (b bool)

func (*HttpBackend) IsWriteOnly

func (hb *HttpBackend) IsWriteOnly() (b bool)

func (*HttpBackend) Ping

func (hb *HttpBackend) Ping() (version string, err error)

func (*HttpBackend) Query

func (hb *HttpBackend) Query(w http.ResponseWriter, req *http.Request) (err error)

Don't setup Accept-Encoding: gzip. Let real client do so. If real client don't support gzip and we setted, it will be a mistake.

func (*HttpBackend) QuerySink added in v1.4.5

func (hb *HttpBackend) QuerySink(req *http.Request) (qr *QueryResult)

func (*HttpBackend) SetRewriting added in v1.4.8

func (hb *HttpBackend) SetRewriting(b bool)

func (*HttpBackend) Write

func (hb *HttpBackend) Write(p []byte) (err error)

func (*HttpBackend) WriteCompressed

func (hb *HttpBackend) WriteCompressed(p []byte) (err error)

func (*HttpBackend) WriteStream

func (hb *HttpBackend) WriteStream(stream io.Reader, compressed bool) (err error)

type InfluxCluster

type InfluxCluster struct {
	DB           string
	WriteTracing bool
	QueryTracing bool
	// contains filtered or unexported fields
}

func NewInfluxCluster

func NewInfluxCluster(cfgsrc *FileConfigSource, nodecfg *NodeConfig) (ic *InfluxCluster)

func (*InfluxCluster) Close

func (ic *InfluxCluster) Close() (err error)

func (*InfluxCluster) Flush

func (ic *InfluxCluster) Flush()

func (*InfluxCluster) GetBackends

func (ic *InfluxCluster) GetBackends(key string) (backends []BackendAPI, ok bool)

func (*InfluxCluster) LoadConfig

func (ic *InfluxCluster) LoadConfig() (err error)

func (*InfluxCluster) Ping

func (ic *InfluxCluster) Ping() (version string, err error)

func (*InfluxCluster) Query

func (ic *InfluxCluster) Query(w http.ResponseWriter, req *http.Request) (err error)

func (*InfluxCluster) Write

func (ic *InfluxCluster) Write(p []byte, precision string) (err error)

func (*InfluxCluster) WriteRow

func (ic *InfluxCluster) WriteRow(line []byte)

Wrong in one row will not stop others. So don't try to return error, just print it.

func (*InfluxCluster) WriteStatistics

func (ic *InfluxCluster) WriteStatistics() (err error)

type InfluxQLExecutor

type InfluxQLExecutor struct {
	// contains filtered or unexported fields
}

func (*InfluxQLExecutor) Query

func (iqe *InfluxQLExecutor) Query(w http.ResponseWriter, req *http.Request, tokens []string) (err error)

func (*InfluxQLExecutor) QueryCreateQL added in v1.3.0

func (iqe *InfluxQLExecutor) QueryCreateQL(w http.ResponseWriter, req *http.Request, tokens []string) (err error)

func (*InfluxQLExecutor) QueryDeleteOrDropQL added in v1.4.2

func (iqe *InfluxQLExecutor) QueryDeleteOrDropQL(w http.ResponseWriter, req *http.Request, tokens []string) (err error)

func (*InfluxQLExecutor) QueryShowQL added in v1.3.0

func (iqe *InfluxQLExecutor) QueryShowQL(w http.ResponseWriter, req *http.Request, tokens []string) (err error)

type Message added in v1.1.0

type Message struct {
	Level string `json:"level"`
	Text  string `json:"text"`
}

Message represents a user-facing message to be included with the result.

type NodeConfig

type NodeConfig struct {
	ListenAddr   string `mapstructure:"listen_addr"`
	DB           string `mapstructure:"db"`
	Username     string `mapstructure:"username"`
	Password     string `mapstructure:"password"`
	DataDir      string `mapstructure:"data_dir"`
	LogPath      string `mapstructure:"log_path"`
	IdleTimeout  int    `mapstructure:"idle_timeout"`
	StatInterval int    `mapstructure:"stat_interval"`
	WriteTracing bool   `mapstructure:"write_tracing"`
	QueryTracing bool   `mapstructure:"query_tracing"`
	HTTPSEnabled bool   `mapstructure:"https_enabled"`
	HTTPSCert    string `mapstructure:"https_cert"`
	HTTPSKey     string `mapstructure:"https_key"`
}

type Querier

type Querier interface {
	Query(w http.ResponseWriter, req *http.Request) (err error)
}

type QueryResult added in v1.4.5

type QueryResult struct {
	Header http.Header
	Status int
	Body   []byte
	Err    error
}

type Response added in v1.3.1

type Response struct {
	Results []*Result `json:"results,omitempty"`
	Err     string    `json:"error,omitempty"`
}

Response represents a list of statement results.

func ResponseFromError added in v1.4.6

func ResponseFromError(err string) (rsp *Response)

func ResponseFromResponseBytes added in v1.4.6

func ResponseFromResponseBytes(b []byte) (rsp *Response, e error)

func ResponseFromResults added in v1.4.1

func ResponseFromResults(results []*Result) (rsp *Response)

func ResponseFromSeries added in v1.4.1

func ResponseFromSeries(series models.Rows) (rsp *Response)

func (*Response) Marshal added in v1.4.1

func (rsp *Response) Marshal(indent bool) (b []byte)

func (*Response) Unmarshal added in v1.4.8

func (rsp *Response) Unmarshal(b []byte) (e error)

type Result added in v1.1.0

type Result struct {
	// StatementID is just the statement's position in the query. It's used
	// to combine statement results if they're being buffered in memory.
	StatementID int         `json:"statement_id"`
	Series      models.Rows `json:"series,omitempty"`
	Messages    []*Message  `json:"messages,omitempty"`
	Partial     bool        `json:"partial,omitempty"`
	Err         string      `json:"error,omitempty"`
}

Result represents a resultset returned from a single statement. Rows represents a list of rows that can be sorted consistently by name/tag.

func ResultsFromResponseBytes added in v1.3.3

func ResultsFromResponseBytes(b []byte) (results []*Result, e error)

type Statistics

type Statistics struct {
	QueryRequests        int64
	QueryRequestsFail    int64
	WriteRequests        int64
	WriteRequestsFail    int64
	PingRequests         int64
	PingRequestsFail     int64
	PointsWritten        int64
	PointsWrittenFail    int64
	WriteRequestDuration int64
	QueryRequestDuration int64
}

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL