subscriber

package
v1.5.1-0...-4d21ab9 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jan 7, 2019 License: MIT Imports: 21 Imported by: 0

Documentation

Overview

Package subscriber implements the subscriber service to forward incoming data to remote services.

Index

Constants

View Source
const (
	// DefaultHTTPTimeout is the default HTTP timeout for a Config.
	DefaultHTTPTimeout = 30 * time.Second

	// DefaultWriteConcurrency is the default write concurrency for a Config.
	DefaultWriteConcurrency = 40

	// DefaultWriteBufferSize is the default write buffer size for a Config.
	DefaultWriteBufferSize = 1000
)

Variables

This section is empty.

Functions

This section is empty.

Types

type BalanceMode

type BalanceMode int

BalanceMode specifies what balance mode to use on a subscription.

const (
	// ALL indicates to send writes to all subscriber destinations.
	ALL BalanceMode = iota

	// ANY indicates to send writes to a single subscriber destination, round robin.
	ANY
)

type Config

type Config struct {
	// Whether to enable to Subscriber service
	Enabled bool `toml:"enabled"`

	HTTPTimeout toml.Duration `toml:"http-timeout"`

	// InsecureSkipVerify gets passed to the http client, if true, it will
	// skip https certificate verification. Defaults to false
	InsecureSkipVerify bool `toml:"insecure-skip-verify"`

	// configure the path to the PEM encoded CA certs file. If the
	// empty string, the default system certs will be used
	CaCerts string `toml:"ca-certs"`

	// The number of writer goroutines processing the write channel.
	WriteConcurrency int `toml:"write-concurrency"`

	// The number of in-flight writes buffered in the write channel.
	WriteBufferSize int `toml:"write-buffer-size"`

	// TLS is a base tls config to use for https clients.
	TLS *tls.Config `toml:"-"`
}

Config represents a configuration of the subscriber service.

func NewConfig

func NewConfig() Config

NewConfig returns a new instance of a subscriber config.

func (Config) Diagnostics

func (c Config) Diagnostics() (*diagnostics.Diagnostics, error)

Diagnostics returns a diagnostics representation of a subset of the Config.

func (Config) Validate

func (c Config) Validate() error

Validate returns an error if the config is invalid.

type HTTP

type HTTP struct {
	// contains filtered or unexported fields
}

HTTP supports writing points over HTTP using the line protocol.

func NewHTTP

func NewHTTP(addr string, timeout time.Duration) (*HTTP, error)

NewHTTP returns a new HTTP points writer with default options.

func NewHTTPS

func NewHTTPS(addr string, timeout time.Duration, unsafeSsl bool, caCerts string, tlsConfig *tls.Config) (*HTTP, error)

NewHTTPS returns a new HTTPS points writer with default options and HTTPS configured.

func (*HTTP) WritePoints

func (h *HTTP) WritePoints(p *coordinator.WritePointsRequest) (err error)

WritePoints writes points over HTTP transport.

type PointsWriter

type PointsWriter interface {
	WritePoints(p *coordinator.WritePointsRequest) error
}

PointsWriter is an interface for writing points to a subscription destination. Only WritePoints() needs to be satisfied. PointsWriter implementations must be goroutine safe.

type Service

type Service struct {
	MetaClient interface {
		Databases() []meta.DatabaseInfo
		WaitForDataChanged() chan struct{}
	}
	NewPointsWriter func(u url.URL) (PointsWriter, error)
	Logger          *zap.Logger
	// contains filtered or unexported fields
}

Service manages forking the incoming data from InfluxDB to defined third party destinations. Subscriptions are defined per database and retention policy.

func NewService

func NewService(c Config) *Service

NewService returns a subscriber service with given settings

func (*Service) Close

func (s *Service) Close() error

Close terminates the subscription service. It will panic if called multiple times or without first opening the service.

func (*Service) Open

func (s *Service) Open() error

Open starts the subscription service.

func (*Service) Points

func (s *Service) Points() chan<- *coordinator.WritePointsRequest

Points returns a channel into which write point requests can be sent.

func (*Service) Statistics

func (s *Service) Statistics(tags map[string]string) []models.Statistic

Statistics returns statistics for periodic monitoring.

func (*Service) Update

func (s *Service) Update() error

Update will start new and stop deleted subscriptions.

func (*Service) WithLogger

func (s *Service) WithLogger(log *zap.Logger)

WithLogger sets the logger on the service.

type Statistics

type Statistics struct {
	CreateFailures int64
	PointsWritten  int64
	WriteFailures  int64
}

Statistics maintains the statistics for the subscriber service.

type UDP

type UDP struct {
	// contains filtered or unexported fields
}

UDP supports writing points over UDP using the line protocol.

func NewUDP

func NewUDP(addr string) *UDP

NewUDP returns a new UDP listener with default options.

func (*UDP) WritePoints

func (u *UDP) WritePoints(p *coordinator.WritePointsRequest) (err error)

WritePoints writes points over UDP transport.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL