Back to

Package subscriber

Latest Go to latest

The highest tagged major version is .

Published: Aug 20, 2019 | License: MIT | Module:


Package subscriber implements the subscriber service to forward incoming data to remote services.



const (
	// DefaultHTTPTimeout is the default HTTP timeout for a Config.
	DefaultHTTPTimeout = 30 * time.Second

	// DefaultWriteConcurrency is the default write concurrency for a Config.
	DefaultWriteConcurrency = 40

	// DefaultWriteBufferSize is the default write buffer size for a Config.
	DefaultWriteBufferSize = 1000

type BalanceMode

type BalanceMode int

BalanceMode specifies what balance mode to use on a subscription.

const (
	// ALL indicates to send writes to all subscriber destinations.
	ALL BalanceMode = iota

	// ANY indicates to send writes to a single subscriber destination, round robin.

type Config

type Config struct {
	// Whether to enable to Subscriber service
	Enabled bool `toml:"enabled"`

	HTTPTimeout toml.Duration `toml:"http-timeout"`

	// InsecureSkipVerify gets passed to the http client, if true, it will
	// skip https certificate verification. Defaults to false
	InsecureSkipVerify bool `toml:"insecure-skip-verify"`

	// configure the path to the PEM encoded CA certs file. If the
	// empty string, the default system certs will be used
	CaCerts string `toml:"ca-certs"`

	// The number of writer goroutines processing the write channel.
	WriteConcurrency int `toml:"write-concurrency"`

	// The number of in-flight writes buffered in the write channel.
	WriteBufferSize int `toml:"write-buffer-size"`

	// TLS is a base tls config to use for https clients.
	TLS *tls.Config `toml:"-"`

Config represents a configuration of the subscriber service.

func NewConfig

func NewConfig() Config

NewConfig returns a new instance of a subscriber config.

func (Config) Diagnostics

func (c Config) Diagnostics() (*diagnostics.Diagnostics, error)

Diagnostics returns a diagnostics representation of a subset of the Config.

func (Config) Validate

func (c Config) Validate() error

Validate returns an error if the config is invalid.

type HTTP

type HTTP struct {
	// contains filtered or unexported fields

HTTP supports writing points over HTTP using the line protocol.

func NewHTTP

func NewHTTP(addr string, timeout time.Duration) (*HTTP, error)

NewHTTP returns a new HTTP points writer with default options.

func NewHTTPS

func NewHTTPS(addr string, timeout time.Duration, unsafeSsl bool, caCerts string, tlsConfig *tls.Config) (*HTTP, error)

NewHTTPS returns a new HTTPS points writer with default options and HTTPS configured.

func (*HTTP) WritePoints

func (h *HTTP) WritePoints(p *coordinator.WritePointsRequest) (err error)

WritePoints writes points over HTTP transport.

type PointsWriter

type PointsWriter interface {
	WritePoints(p *coordinator.WritePointsRequest) error

PointsWriter is an interface for writing points to a subscription destination. Only WritePoints() needs to be satisfied. PointsWriter implementations must be goroutine safe.

type Service

type Service struct {
	MetaClient interface {
		Databases() []meta.DatabaseInfo
		WaitForDataChanged() chan struct{}
	NewPointsWriter func(u url.URL) (PointsWriter, error)
	Logger          *zap.Logger
	// contains filtered or unexported fields

Service manages forking the incoming data from InfluxDB to defined third party destinations. Subscriptions are defined per database and retention policy.

func NewService

func NewService(c Config) *Service

NewService returns a subscriber service with given settings

func (*Service) Close

func (s *Service) Close() error

Close terminates the subscription service. It will panic if called multiple times or without first opening the service.

func (*Service) Open

func (s *Service) Open() error

Open starts the subscription service.

func (*Service) Points

func (s *Service) Points() chan<- *coordinator.WritePointsRequest

Points returns a channel into which write point requests can be sent.

func (*Service) Statistics

func (s *Service) Statistics(tags map[string]string) []models.Statistic

Statistics returns statistics for periodic monitoring.

func (*Service) Update

func (s *Service) Update() error

Update will start new and stop deleted subscriptions.

func (*Service) WithLogger

func (s *Service) WithLogger(log *zap.Logger)

WithLogger sets the logger on the service.

type Statistics

type Statistics struct {
	CreateFailures int64
	PointsWritten  int64
	WriteFailures  int64

Statistics maintains the statistics for the subscriber service.

type UDP

type UDP struct {
	// contains filtered or unexported fields

UDP supports writing points over UDP using the line protocol.

func NewUDP

func NewUDP(addr string) *UDP

NewUDP returns a new UDP listener with default options.

func (*UDP) WritePoints

func (u *UDP) WritePoints(p *coordinator.WritePointsRequest) (err error)

WritePoints writes points over UDP transport.

Package Files

Documentation was rendered with GOOS=linux and GOARCH=amd64.

Jump to identifier

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to identifier