server

package
v1.0.3 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jun 17, 2022 License: MIT Imports: 65 Imported by: 1

Documentation

Index

Constants

View Source
const (
	// DefaultTCPBindAddress is the default address for various RPC services.
	DefaultTCPBindAddress = "127.0.0.1:8088"
	DefaultCluster        = false
	DefaultHostname       = "localhost"
)
View Source
const (
	// DefaultBindAddress is the default address to bind to.
	DefaultBindAddress = ":8086"

	// DefaultRealm is the default realm sent back when issuing a basic auth challenge.
	DefaultRealm = "CnosDB"

	// DefaultBindSocket is the default unix socket to bind to.
	DefaultBindSocket = "/var/run/cnosdb.sock"

	// DefaultMaxBodySize is the default maximum size of a client request body, in bytes. Specify 0 for no limit.
	DefaultMaxBodySize = 25e6

	// DefaultEnqueuedWriteTimeout is the maximum time a write request can wait to be processed.
	DefaultEnqueuedWriteTimeout = 30 * time.Second
)
View Source
const (
	// DefaultChunkSize specifies the maximum number of points that will
	// be read before sending results back to the engine.
	//
	// This has no relation to the number of bytes that are returned.
	DefaultChunkSize = 10000

	DefaultDebugRequestsInterval = 10 * time.Second

	MaxDebugRequestsInterval = 6 * time.Hour
)
View Source
const (
	RequestClusterJoin uint8 = iota
	RequestUpdateDataNode
	RequestReplaceDataNode
	RequestClusterPreJoin
)
View Source
const NodeMuxHeader = "node"

Variables

This section is empty.

Functions

func WrapWithAuthenticate

func WrapWithAuthenticate(inner serveAuthenticateFunc, conf *HTTPConfig, metaCli meta.MetaClient) http.Handler

WrapWithAuthenticate wraps a Handler and ensures that if user credentials are passed in an attempt is made to authenticate that user. If authentication fails, an error is returned.

There is one exception: if there are no users in the system, authentication is not required. This is to facilitate bootstrapping of a system with authentication enabled.

func WrapWithCors

func WrapWithCors(inner http.Handler) http.Handler

WrapWithCors responds to incoming requests and adds the appropriate cors headers

func WrapWithGzipResponseWriter

func WrapWithGzipResponseWriter(inner http.Handler) http.Handler

WrapWithGzipResponseWriter determines if the client can accept compressed responses, and encodes accordingly.

func WrapWithRecovery

func WrapWithRecovery(inner http.Handler) http.Handler

WrapWithRecovery

func WrapWithRequestID

func WrapWithRequestID(inner http.Handler) http.Handler

WrapWithRequestID

func WrapWithResponseWriter

func WrapWithResponseWriter(inner http.Handler) http.Handler

Types

type AuthenticationMethod

type AuthenticationMethod int

AuthenticationMethod 鉴权方式

const (
	// UserAuthentication 基于 basic authentication 进行鉴权
	UserAuthentication AuthenticationMethod = iota

	// BearerAuthentication 基于 JWT 进行鉴权
	BearerAuthentication
)

目前支持的鉴权方式

type Config

type Config struct {
	// BindAddress is the address that all TCP services use (Raft, Snapshot, Cluster, etc.)
	BindAddress string `toml:"bind-address"`
	Cluster     bool   `toml:"cluster"`
	Hostname    string `toml:"hostname"`
	// Server reporting
	ReportingDisabled bool `toml:"reporting-disabled"`

	Meta            *meta.Config
	Data            tsdb.Config
	Coordinator     coordinator.Config
	RetentionPolicy rp.Config
	Precreator      precreator.Config

	Monitor         monitor.Config
	Subscriber      subscriber.Config
	HTTPD           HTTPConfig
	Log             *logger.Config
	ContinuousQuery continuous_querier.Config
	HintedHandoff   hh.Config
	TLS             tlsconfig.Config
}

func NewConfig

func NewConfig() *Config

NewConfig returns an instance of Config with reasonable defaults.

func NewDemoConfig

func NewDemoConfig() (*Config, error)

NewDemoConfig returns the config that runs when no config is specified.

func (*Config) ApplyEnvOverrides

func (c *Config) ApplyEnvOverrides(getenv func(string) string) error

ApplyEnvOverrides apply the environment configuration on top of the config.

func (*Config) FromToml

func (c *Config) FromToml(input string) error

FromToml loads the config from TOML.

func (*Config) FromTomlFile

func (c *Config) FromTomlFile(fpath string) error

FromTomlFile loads the config from a TOML file.

func (*Config) Validate

func (c *Config) Validate() error

Validate returns an error if the config is invalid.

type HTTPConfig

type HTTPConfig struct {
	Enabled                 bool           `toml:"enabled"`
	BindAddress             string         `toml:"bind-address"`
	AuthEnabled             bool           `toml:"auth-enabled"`
	LogEnabled              bool           `toml:"log-enabled"`
	SuppressWriteLog        bool           `toml:"suppress-write-log"`
	WriteTracing            bool           `toml:"write-tracing"`
	PprofEnabled            bool           `toml:"pprof-enabled"`
	DebugPprofEnabled       bool           `toml:"debug-pprof-enabled"`
	HTTPSEnabled            bool           `toml:"https-enabled"`
	HTTPSCertificate        string         `toml:"https-certificate"`
	HTTPSPrivateKey         string         `toml:"https-private-key"`
	MaxRowLimit             int            `toml:"max-row-limit"`
	MaxConnectionLimit      int            `toml:"max-connection-limit"`
	SharedSecret            string         `toml:"shared-secret"`
	Realm                   string         `toml:"realm"`
	UnixSocketEnabled       bool           `toml:"unix-socket-enabled"`
	UnixSocketGroup         *toml.Group    `toml:"unix-socket-group"`
	UnixSocketPermissions   toml.FileMode  `toml:"unix-socket-permissions"`
	BindSocket              string         `toml:"bind-socket"`
	MaxBodySize             int            `toml:"max-body-size"`
	AccessLogPath           string         `toml:"access-log-path"`
	AccessLogStatusFilters  []StatusFilter `toml:"access-log-status-filters"`
	MaxConcurrentWriteLimit int            `toml:"max-concurrent-write-limit"`
	MaxEnqueuedWriteLimit   int            `toml:"max-enqueued-write-limit"`
	EnqueuedWriteTimeout    time.Duration  `toml:"enqueued-write-timeout"`
	TLS                     *tls.Config    `toml:"-"`
}

func NewHTTPConfig

func NewHTTPConfig() HTTPConfig

type Handler

type Handler struct {
	Version string

	QueryAuthorizer QueryAuthorizer

	WriteAuthorizer interface {
		AuthorizeWrite(username, database string) error
	}

	QueryExecutor *query.Executor

	StorageStore *storage.Store

	Monitor interface {
		Statistics(tags map[string]string) ([]*monitor.Statistic, error)
		Diagnostics() (map[string]*diagnostics.Diagnostics, error)
	}

	PointsWriter interface {
		WritePoints(database, retentionPolicy string, consistencyLevel models.ConsistencyLevel, user meta.User, points []models.Point) error
	}
	// contains filtered or unexported fields
}

Handler http 请求的处理逻辑

func NewHandler

func NewHandler(conf *HTTPConfig) *Handler

创建 Handler 的实例,并设置 router

func (*Handler) AddRoutes

func (h *Handler) AddRoutes(routes ...route)

AddRoutes sets the provided routes on the Handler.

func (*Handler) Open

func (h *Handler) Open()

func (*Handler) ServeHTTP

func (h *Handler) ServeHTTP(w http.ResponseWriter, r *http.Request)

响应 HTTP 请求

func (*Handler) Statistics

func (h *Handler) Statistics(tags map[string]string) []models.Statistic

Statistics returns statistics for periodic monitoring.

func (*Handler) WrapWithLogger added in v1.0.1

func (h *Handler) WrapWithLogger(inner http.Handler, filters []StatusFilter) http.Handler

WrapWithLogger

type NodeRequest added in v1.0.3

type NodeRequest struct {
	Type     uint8
	NodeAddr string
	OldAddr  string
	Peers    []string
}

type NodeResponse added in v1.0.3

type NodeResponse struct {
	StatusCode uint32
	Message    string
}

type QueryAuthorizer

type QueryAuthorizer interface {
	AuthorizeQuery(u meta.User, query *cnosql.Query, database string) (query.FineAuthorizer, error)
	AuthorizeDatabase(u meta.User, priv cnosql.Privilege, database string) error
}

type RequestInfo

type RequestInfo struct {
	IPAddr   string
	Username string
}

func (*RequestInfo) String

func (r *RequestInfo) String() string

type RequestProfile

type RequestProfile struct {
	Requests map[RequestInfo]*RequestStats
	// contains filtered or unexported fields
}

func (*RequestProfile) AddQuery

func (p *RequestProfile) AddQuery(info RequestInfo)

func (*RequestProfile) AddWrite

func (p *RequestProfile) AddWrite(info RequestInfo)

func (*RequestProfile) Stop

func (p *RequestProfile) Stop()

Stop informs the RequestTracker to stop collecting statistics for this profile.

type RequestStats

type RequestStats struct {
	Writes  int64 `json:"writes"`
	Queries int64 `json:"queries"`
}

type RequestTracker

type RequestTracker struct {
	// contains filtered or unexported fields
}

func NewRequestTracker

func NewRequestTracker() *RequestTracker

func (*RequestTracker) Add

func (rt *RequestTracker) Add(req *http.Request, user meta.User)

func (*RequestTracker) TrackRequests

func (rt *RequestTracker) TrackRequests() *RequestProfile

type Response

type Response struct {
	Results []*query.Result
	Err     error
}

Response represents a list of statement results.

func (*Response) Error

func (r *Response) Error() error

Error returns the first error from any statement. Returns nil if no errors occurred on any statements.

func (Response) MarshalJSON

func (r Response) MarshalJSON() ([]byte, error)

MarshalJSON encodes a Response struct into JSON.

func (*Response) UnmarshalJSON

func (r *Response) UnmarshalJSON(b []byte) error

UnmarshalJSON decodes the data into the Response struct.

type ResponseLogger

type ResponseLogger struct {
	// contains filtered or unexported fields
}

ResponseLogger is wrapper of http.ResponseWriter that keeps track of its HTTP status code and body size

func NewResponseLogger

func NewResponseLogger(w http.ResponseWriter) *ResponseLogger

func (*ResponseLogger) CloseNotify

func (l *ResponseLogger) CloseNotify() <-chan bool

func (*ResponseLogger) Flush

func (l *ResponseLogger) Flush()

func (*ResponseLogger) Header

func (l *ResponseLogger) Header() http.Header

func (*ResponseLogger) Size

func (l *ResponseLogger) Size() int

func (*ResponseLogger) Status

func (l *ResponseLogger) Status() int

func (*ResponseLogger) Write

func (l *ResponseLogger) Write(b []byte) (int, error)

func (*ResponseLogger) WriteHeader

func (l *ResponseLogger) WriteHeader(s int)

type ResponseWriter

type ResponseWriter interface {
	// WriteResponse writes a response.
	WriteResponse(resp Response) (int, error)

	http.ResponseWriter
}

ResponseWriter is an interface for writing a response.

func NewResponseWriter

func NewResponseWriter(w http.ResponseWriter, r *http.Request) ResponseWriter

NewResponseWriter creates a new ResponseWriter based on the Accept header in the request that wraps the ResponseWriter.

type Server

type Server struct {
	Config *Config

	Node *cnosdb.Node

	meta.MetaClient

	TSDBStore *tsdb.Store

	PointsWriter *coordinator.PointsWriter

	// Profiling
	CPUProfile            string
	CPUProfileWriteCloser io.WriteCloser
	MemProfile            string
	MemProfileWriteCloser io.WriteCloser

	Logger *zap.Logger
	// contains filtered or unexported fields
}

func NewServer

func NewServer(c *Config) *Server

func (*Server) Close

func (s *Server) Close()

func (*Server) Err

func (s *Server) Err() <-chan error

Err returns an error channel that multiplexes all out of band errors received from all services.

func (*Server) HTTPAddr

func (s *Server) HTTPAddr() string

HTTPAddr returns the HTTP address used by other nodes for HTTP queries and writes. todo: Get dynamic address

func (*Server) Open

func (s *Server) Open() error

func (*Server) Statistics

func (s *Server) Statistics(tags map[string]string) []models.Statistic

Statistics returns statistics for the services running in the Server.

func (*Server) TCPAddr

func (s *Server) TCPAddr() string

TCPAddr returns the TCP address used by other nodes for cluster communication.

func (*Server) URL added in v1.0.1

func (s *Server) URL() string

type Statistics

type Statistics struct {
	Requests                     int64
	CQRequests                   int64
	QueryRequests                int64
	WriteRequests                int64
	PingRequests                 int64
	StatusRequests               int64
	WriteRequestBytesReceived    int64
	QueryRequestBytesTransmitted int64
	PointsWrittenOK              int64
	ValuesWrittenOK              int64
	PointsWrittenDropped         int64
	PointsWrittenFail            int64
	AuthenticationFailures       int64
	RequestDuration              int64
	QueryRequestDuration         int64
	WriteRequestDuration         int64
	ActiveRequests               int64
	ActiveWriteRequests          int64
	ClientErrors                 int64
	ServerErrors                 int64
	RecoveredPanics              int64
	PromWriteRequests            int64
	PromReadRequests             int64
}

Statistics maintains statistics for the httpd service.

type StatusFilter

type StatusFilter struct {
	// contains filtered or unexported fields
}

StatusFilter HTTP 状态码的模式( statusCode % divisor = base )

func ParseStatusFilter

func ParseStatusFilter(s string) (StatusFilter, error)

ParseStatusFilter 根据字符串 s 生成 StatusFilter

func (StatusFilter) MarshalText

func (sf StatusFilter) MarshalText() (text []byte, err error)

MarshalText 将该 Filter 转换为 TOML 参数

func (StatusFilter) Match

func (sf StatusFilter) Match(statusCode int) bool

Match 检查 HTTP 状态码是否符合该 Filter

func (*StatusFilter) UnmarshalText

func (sf *StatusFilter) UnmarshalText(text []byte) error

UnmarshalText 解析 TOML 参数,并设置该 Filter

type StatusFilters

type StatusFilters []StatusFilter

func (StatusFilters) Match

func (filters StatusFilters) Match(statusCode int) bool

type Throttler

type Throttler struct {

	// Maximum amount of time requests can wait in queue.
	// Must be set before adding middleware.
	EnqueueTimeout time.Duration

	Logger *zap.Logger
	// contains filtered or unexported fields
}

Throttler represents an HTTP throttler that limits the number of concurrent requests being processed as well as the number of enqueued requests.

func NewThrottler

func NewThrottler(concurrentN, maxEnqueueN int) *Throttler

NewThrottler returns a new instance of Throttler that limits to concurrentN. requests processed at a time and maxEnqueueN requests waiting to be processed.

func (*Throttler) WrapWithThrottler

func (t *Throttler) WrapWithThrottler(h http.Handler) http.Handler

WrapWithThrottler wraps h in a middleware Handler that throttles requests.

Directories

Path Synopsis
Package continuous_querier provides the continuous query service.
Package continuous_querier provides the continuous query service.
Package coordinator contains abstractions for writing points, executing statements, and accessing meta data.
Package coordinator contains abstractions for writing points, executing statements, and accessing meta data.
Package precreator provides the shard precreation service.
Package precreator provides the shard precreation service.
Package rp provides the retention policy enforcement service.
Package rp provides the retention policy enforcement service.
Package snapshotter provides the meta snapshot service.
Package snapshotter provides the meta snapshot service.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL