v1.7.9 Latest Latest

This package is not in the latest version of its module.

Go to latest
Published: Oct 27, 2019 License: MIT Imports: 21 Imported by: 0



Package subscriber implements the subscriber service to forward incoming data to remote services.



View Source
const (
	// DefaultHTTPTimeout is the default HTTP timeout for a Config.
	DefaultHTTPTimeout = 30 * time.Second

	// DefaultWriteConcurrency is the default write concurrency for a Config.
	DefaultWriteConcurrency = 40

	// DefaultWriteBufferSize is the default write buffer size for a Config.
	DefaultWriteBufferSize = 1000


This section is empty.


This section is empty.


type BalanceMode

type BalanceMode int

BalanceMode specifies what balance mode to use on a subscription.

const (
	// ALL indicates to send writes to all subscriber destinations.
	ALL BalanceMode = iota

	// ANY indicates to send writes to a single subscriber destination, round robin.

type Config

type Config struct {
	// Whether to enable to Subscriber service
	Enabled bool `toml:"enabled"`

	HTTPTimeout toml.Duration `toml:"http-timeout"`

	// InsecureSkipVerify gets passed to the http client, if true, it will
	// skip https certificate verification. Defaults to false
	InsecureSkipVerify bool `toml:"insecure-skip-verify"`

	// configure the path to the PEM encoded CA certs file. If the
	// empty string, the default system certs will be used
	CaCerts string `toml:"ca-certs"`

	// The number of writer goroutines processing the write channel.
	WriteConcurrency int `toml:"write-concurrency"`

	// The number of in-flight writes buffered in the write channel.
	WriteBufferSize int `toml:"write-buffer-size"`

	// TLS is a base tls config to use for https clients.
	TLS *tls.Config `toml:"-"`

Config represents a configuration of the subscriber service.

func NewConfig

func NewConfig() Config

NewConfig returns a new instance of a subscriber config.

func (Config) Diagnostics added in v1.3.0

func (c Config) Diagnostics() (*diagnostics.Diagnostics, error)

Diagnostics returns a diagnostics representation of a subset of the Config.

func (Config) Validate added in v1.0.0

func (c Config) Validate() error

Validate returns an error if the config is invalid.

type HTTP added in v1.0.0

type HTTP struct {
	// contains filtered or unexported fields

HTTP supports writing points over HTTP using the line protocol.

func NewHTTP added in v1.0.0

func NewHTTP(addr string, timeout time.Duration) (*HTTP, error)

NewHTTP returns a new HTTP points writer with default options.

func NewHTTPS added in v1.1.0

func NewHTTPS(addr string, timeout time.Duration, unsafeSsl bool, caCerts string, tlsConfig *tls.Config) (*HTTP, error)

NewHTTPS returns a new HTTPS points writer with default options and HTTPS configured.

func (*HTTP) WritePoints added in v1.0.0

func (h *HTTP) WritePoints(p *coordinator.WritePointsRequest) (err error)

WritePoints writes points over HTTP transport.

type PointsWriter

type PointsWriter interface {
	WritePoints(p *coordinator.WritePointsRequest) error

PointsWriter is an interface for writing points to a subscription destination. Only WritePoints() needs to be satisfied. PointsWriter implementations must be goroutine safe.

type Service

type Service struct {
	MetaClient interface {
		Databases() []meta.DatabaseInfo
		WaitForDataChanged() chan struct{}
	NewPointsWriter func(u url.URL) (PointsWriter, error)
	Logger          *zap.Logger
	// contains filtered or unexported fields

Service manages forking the incoming data from InfluxDB to defined third party destinations. Subscriptions are defined per database and retention policy.

func NewService

func NewService(c Config) *Service

NewService returns a subscriber service with given settings

func (*Service) Close

func (s *Service) Close() error

Close terminates the subscription service. It will panic if called multiple times or without first opening the service.

func (*Service) Open

func (s *Service) Open() error

Open starts the subscription service.

func (*Service) Points

func (s *Service) Points() chan<- *coordinator.WritePointsRequest

Points returns a channel into which write point requests can be sent.

func (*Service) Statistics added in v1.0.0

func (s *Service) Statistics(tags map[string]string) []models.Statistic

Statistics returns statistics for periodic monitoring.

func (*Service) Update

func (s *Service) Update() error

Update will start new and stop deleted subscriptions.

func (*Service) WithLogger added in v1.2.0

func (s *Service) WithLogger(log *zap.Logger)

WithLogger sets the logger on the service.

type Statistics added in v1.0.0

type Statistics struct {
	CreateFailures int64
	PointsWritten  int64
	WriteFailures  int64

Statistics maintains the statistics for the subscriber service.

type UDP

type UDP struct {
	// contains filtered or unexported fields

UDP supports writing points over UDP using the line protocol.

func NewUDP

func NewUDP(addr string) *UDP

NewUDP returns a new UDP listener with default options.

func (*UDP) WritePoints

func (u *UDP) WritePoints(p *coordinator.WritePointsRequest) (err error)

WritePoints writes points over UDP transport.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL