Version: v1.10.0 Latest Latest

This package is not in the latest version of its module.

Go to latest
Published: Aug 2, 2022 License: MIT Imports: 23 Imported by: 2



Package subscriber implements the subscriber service to forward incoming data to remote services.



View Source
const (
	// DefaultHTTPTimeout is the default HTTP timeout for a Config.
	DefaultHTTPTimeout = 30 * time.Second

	// DefaultWriteConcurrency is the default write concurrency for a Config.
	DefaultWriteConcurrency = 40

	// DefaultWriteBufferSize is the default write buffer size for a Config.
	DefaultWriteBufferSize = 1000


This section is empty.


This section is empty.


type BalanceMode

type BalanceMode int

BalanceMode specifies what balance mode to use on a subscription.

const (
	// ALL indicates to send writes to all subscriber destinations.
	ALL BalanceMode = iota

	// ANY indicates to send writes to a single subscriber destination, round robin.

type Config

type Config struct {
	// Whether to enable to Subscriber service
	Enabled bool `toml:"enabled"`

	HTTPTimeout toml.Duration `toml:"http-timeout"`

	// InsecureSkipVerify gets passed to the http client, if true, it will
	// skip https certificate verification. Defaults to false
	InsecureSkipVerify bool `toml:"insecure-skip-verify"`

	// configure the path to the PEM encoded CA certs file. If the
	// empty string, the default system certs will be used
	CaCerts string `toml:"ca-certs"`

	// The number of writer goroutines processing the write channel.
	WriteConcurrency int `toml:"write-concurrency"`

	// The number of in-flight writes buffered in the write channel.
	WriteBufferSize int `toml:"write-buffer-size"`

	// TotalBufferBytes is the total size in bytes allocated to buffering across all subscriptions.
	// Each named subscription will receive an even division of the total.
	TotalBufferBytes int `toml:"total-buffer-bytes"`

	// TLS is a base tls config to use for https clients.
	TLS *tls.Config `toml:"-"`

Config represents a configuration of the subscriber service.

func NewConfig

func NewConfig() Config

NewConfig returns a new instance of a subscriber config.

func (Config) Diagnostics added in v1.3.0

func (c Config) Diagnostics() (*diagnostics.Diagnostics, error)

Diagnostics returns a diagnostics representation of a subset of the Config.

func (Config) Validate added in v1.0.0

func (c Config) Validate() error

Validate returns an error if the config is invalid.

type HTTP added in v1.0.0

type HTTP struct {
	// contains filtered or unexported fields

HTTP supports writing points over HTTP using the line protocol.

func NewHTTP added in v1.0.0

func NewHTTP(addr string, timeout time.Duration) (*HTTP, error)

NewHTTP returns a new HTTP points writer with default options.

func NewHTTPS added in v1.1.0

func NewHTTPS(addr string, timeout time.Duration, unsafeSsl bool, caCerts string, tlsConfig *tls.Config) (*HTTP, error)

NewHTTPS returns a new HTTPS points writer with default options and HTTPS configured.

func (*HTTP) WritePointsContext added in v1.9.3

func (h *HTTP) WritePointsContext(ctx context.Context, request WriteRequest) (err error)

WritePoints writes points over HTTP transport.

type PointsWriter

type PointsWriter interface {
	WritePointsContext(ctx context.Context, request WriteRequest) error

PointsWriter is an interface for writing points to a subscription destination. Only WritePoints() needs to be satisfied. PointsWriter implementations must be goroutine safe.

type Service

type Service struct {
	MetaClient interface {
		Databases() []meta.DatabaseInfo
		WaitForDataChanged() chan struct{}
	NewPointsWriter func(u url.URL) (PointsWriter, error)
	Logger          *zap.Logger
	// contains filtered or unexported fields

Service manages forking the incoming data from InfluxDB to defined third party destinations. Subscriptions are defined per database and retention policy.

func NewService

func NewService(c Config) *Service

NewService returns a subscriber service with given settings

func (*Service) Close

func (s *Service) Close() error

Close terminates the subscription service. It will return an error if Open was not called first.

func (*Service) Open

func (s *Service) Open() error

Open starts the subscription service.

func (*Service) Send added in v1.9.3

func (s *Service) Send(request *coordinator.WritePointsRequest)

func (*Service) Statistics added in v1.0.0

func (s *Service) Statistics(tags map[string]string) []models.Statistic

Statistics returns statistics for periodic monitoring.

func (*Service) WithLogger added in v1.2.0

func (s *Service) WithLogger(log *zap.Logger)

WithLogger sets the logger on the service.

type Statistics added in v1.0.0

type Statistics struct {
	CreateFailures int64
	PointsWritten  int64
	WriteFailures  int64

Statistics maintains the statistics for the subscriber service.

type UDP

type UDP struct {
	// contains filtered or unexported fields

UDP supports writing points over UDP using the line protocol.

func NewUDP

func NewUDP(addr string) *UDP

NewUDP returns a new UDP listener with default options.

func (*UDP) WritePointsContext added in v1.9.3

func (u *UDP) WritePointsContext(_ context.Context, request WriteRequest) (err error)

WritePoints writes points over UDP transport.

type WriteRequest added in v1.9.3

type WriteRequest struct {
	Database        string
	RetentionPolicy string
	// contains filtered or unexported fields

WriteRequest is a parsed write request.

func NewWriteRequest added in v1.9.3

func NewWriteRequest(r *coordinator.WritePointsRequest, log *zap.Logger) (wr WriteRequest, numInvalid int64)

func (*WriteRequest) Length added in v1.9.3

func (w *WriteRequest) Length() int

func (*WriteRequest) PointAt added in v1.9.3

func (w *WriteRequest) PointAt(i int) []byte

pointAt uses pointOffsets to slice the lineProtocol buffer and retrieve the i_th point in the request. It includes the trailing newline.

func (*WriteRequest) SizeOf added in v1.9.3

func (w *WriteRequest) SizeOf() int

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL