sender

package
v0.0.1 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Oct 7, 2019 License: LGPL-2.1 Imports: 19 Imported by: 0

Documentation

Overview

Package sender is a set of types that implement skogul.Sender. A Sender in skogul is a simple primitive that receives skogul metrics and "does something with them".

The traditional and obvious sender accepts metrics and uses and external service to persist them to disk. E.g.: the InfluxDB sender stores the metrics to influxdb. The postgres sender accepts metrics and stores to postgres, and so forth.

The other type of senders are "internal", typical for routing. The classic examples are the "dupe" sender that accepts metrics and passes them on to multiple other senders - e.g.: Store to both postgres and influxdb. An other classic is the "fallback" sender: It has a list of senders and tries each one in order until one succeeds, allowing you to send to a primary influxdb normally - if influx fails, write to local disk, if that fails, write a message to the log.

The only thing a sender "must" do is implement Send(c *skogul.Container), and it is disallowed to modify the container in Send(), since multiple senders might be working on it at the same time.

To make a sender configurable, simply ensure data types in the type definition can be Unmarshalled from JSON. A small note on that is that it is necessary to use "SenderRef" and "HandlerRef" objects instead of Sender and Handler directly for now. This is to let the config engine track references that haven't resolved yet.

It also means certain data types need to be avoided or worked around. Currently, time.Duration is such an example, as it is missing a JSON unmrashaller. For such data types, a simple wrapper will do the trick, e.g. skogul.Duration wraps time.Duration.

Index

Constants

This section is empty.

Variables

View Source
var Auto map[string]*Sender

Auto maps sender-names to sender implementation, used for auto configuration.

Functions

func Add

func Add(s Sender) error

Add announces the existence of a sender to the world at large.

Types

type Backoff

type Backoff struct {
	Next    skogul.SenderRef `doc:"The sender to try"`
	Base    skogul.Duration  `doc:"Initial delay after a failure. Will double for each retry"`
	Retries uint64           `doc:"Number of retries before giving up"`
	// contains filtered or unexported fields
}

Backoff sender will send to Next, but retry up to Retries times, with exponential backoff, starting with time.Duration

func (*Backoff) Send

func (bo *Backoff) Send(c *skogul.Container) error

Send with a delay

type Batch

type Batch struct {
	Next      skogul.SenderRef `doc:"Sender that will receive batched metrics"`
	Interval  skogul.Duration  `doc:"Flush the bucket after this duration regardless of how full it is"`
	Threshold int              `doc:"Flush the bucket after reaching this amount of metrics"`
	// contains filtered or unexported fields
}

Batch sender collects metrics into a single container then passes them on after Threshold number of metrics are collected. In case Threshold is "never" reached, it will periodically flush metrics if no message has been received in Interval time.

Internally, the Batch sender consists of three parts. The first part is the Send() part, which just pushes the received container onto a channel.

The second part, which is a single, dedicated go routine, picks up said container and adds it to a batch-container. When the batch container is "full" (e.g.: exceeds Threshold) - or a timeout is reached - the batch container is pushed onto a second channel and a new, empty, batch container is created.

The third part picks up the ready-to-send containers and issues next.Send() on them. This is a separate go routine, one per NumCPU.

This means that:

  1. Batch sender will both do a "fan-in" from potentially multiple Send() calls.
  2. ... and do a fan-out afterwards.
  3. Send() will only block if two channels are full.

func (*Batch) Send

func (bat *Batch) Send(c *skogul.Container) error

Send batches up multiple metrics and passes them on after an interval or a set size is reached. It never returns error, since there is no way to know.

type Counter

type Counter struct {
	Next   skogul.SenderRef  `doc:"Reference to the next sender in the chain"`
	Stats  skogul.HandlerRef `doc:"Handler that will receive the stats periodically"`
	Period skogul.Duration   `doc:"How often to emit stats" example:"5s"`
	// contains filtered or unexported fields
}

Counter sender emits, periodically, the flow-rate of metrics through it. The stats are sent on to the Stats-sender every Period.

To avoid locks and support multiple go routines using the same counter, stats are sent over a channel to a separate goroutine that does the actual aggregation and calculation.

func (*Counter) Send

func (co *Counter) Send(c *skogul.Container) error

Send counts metrics, sends the count on a channel, then executes the next sender in the chain.

type Debug

type Debug struct {
	Prefix string `doc:"Prefix to print before any metric"`
}

Debug sender simply prints the metrics in json-marshalled format to stdout.

func (*Debug) Send

func (db *Debug) Send(c *skogul.Container) error

Send prints the JSON-formatted container to stdout

type Detacher

type Detacher struct {
	Next  skogul.SenderRef `doc:"Sender that receives the metrics."`
	Depth int              `doc:"How many containers can be pending delivery before we start blocking. Defaults to 1000."`
	// contains filtered or unexported fields
}

Detacher accepts a message, sends it to a channel, then picks it up on the other end in a separate go routine. This, unfortunately, leads to fan-in: if used in conjunction with HTTP receiver, for example, you end up going from multiple independent go routines to a single one, which is probably not what you want.

The purpose is to smooth out reading.

func (*Detacher) Send

func (de *Detacher) Send(c *skogul.Container) error

Send ensures a consumer exists, then transmits the container on a channel and returns immediately.

type Dupe

type Dupe struct {
	Next []skogul.SenderRef `doc:"List of senders that will receive metrics, in order."`
}

Dupe sender executes all provided senders in turn.

func (*Dupe) Send

func (dp *Dupe) Send(c *skogul.Container) error

Send sends data down stream

type ErrDiverter

type ErrDiverter struct {
	Next   skogul.SenderRef  `doc:"Send normal metrics here."`
	Err    skogul.HandlerRef `doc:"If the sender under Next fails, convert the error to a metric and send it here."`
	RetErr bool              `` /* 130-byte string literal not displayed */
}

ErrDiverter calls the Next sender, but if it fails, it will convert the error to a Container and send that to Err.

func (*ErrDiverter) Send

func (ed *ErrDiverter) Send(c *skogul.Container) error

Send data to the next sender. If it fails, use the Err sender.

type Fallback

type Fallback struct {
	Next []skogul.SenderRef `doc:"Ordered list of senders that will potentially receive metrics."`
}

Fallback sender tries each provided sender in turn before failing.

E.g.:

primary := sender.InfluxDB{....}
secondary := sender.Queue{....} // Not implemented yet
emergency := sender.Debug{}

fallback := sender.Fallback{}
fallback.Add(&primary)
fallback.Add(&secondary)
fallback.Add(&emergency)

This will send data to Influx normally. If Influx fails, it will send it to a queue. If that fails, it will print it to stdout.

func (*Fallback) Send

func (fb *Fallback) Send(c *skogul.Container) error

Send sends data down stream

type Fanout

type Fanout struct {
	Next    skogul.SenderRef `doc:"Sender receiving the metrics"`
	Workers int              `doc:"Number of worker threads in use. To _fan_in_ you can set this to 1."`
	// contains filtered or unexported fields
}

Fanout sender implements a worker pool for passing data on. This SHOULD be unnecessary, as the receiver should ideally do this for us (e.g.: the HTTP receiver does this natively). However, there might be times where it makes sense, specially since this can be used in reverse too: you can use the Fanout sender to limit the degree of concurrency that downstream is exposed to.

Again, this should really not be needed. If you use the fanout sender, be sure you understand why.

There only settings provided is "Next" to provide the next sender, and "Workers", that defines the size of the worker pool.

func (*Fanout) Send

func (fo *Fanout) Send(c *skogul.Container) error

Send ensures the workers are booted, then picks up a channel from available workers and sends the container to that container.

type ForwardAndFail

type ForwardAndFail struct {
	Next skogul.SenderRef `doc:"Sender receiving the metrics"`
}

ForwardAndFail sender will pass the container to the Next sender, but always returns an error. The use-case for this is to allow the fallback Sender or similar to eventually send data to a sender that ALWAYS works, e.g. the Debug-sender og just printing a message in the log, but we still want to propagate the error upwards in the stack so clients can take appropriate action.

Example use:

faf := sender.ForwardAndFail{Next: skogul.Debug{}} fb := sender.Fallback{Next: []skogul.Sender{influx, faf}}

func (*ForwardAndFail) Send

func (faf *ForwardAndFail) Send(c *skogul.Container) error

Send forwards the data to the next sender and always returns an error.

type HTTP

type HTTP struct {
	URL      string          `doc:"Fully qualified URL to send data to." example:"http://localhost:6081/ https://user:password@[::1]:6082/"`
	Timeout  skogul.Duration `doc:"HTTP timeout."`
	Insecure bool            `doc:"Disable TLS certificate validation."`
	// contains filtered or unexported fields
}

HTTP sender POSTs the Skogul JSON-encoded data to the provided URL.

func (*HTTP) Send

func (ht *HTTP) Send(c *skogul.Container) error

Send POSTS data

func (*HTTP) Verify

func (ht *HTTP) Verify() error

Verify checks that configuration is sensible

type InfluxDB

type InfluxDB struct {
	URL         string          `doc:"URL to InfluxDB API. Must include write end-point and database to write to." example:"http://[::1]:8086/write?db=foo"`
	Measurement string          `doc:"Measurement name to write to."`
	Timeout     skogul.Duration `doc:"HTTP timeout"`
	// contains filtered or unexported fields
}

InfluxDB posts data to the provided URL and measurement, using the InfluxDB line format over HTTP.

func (*InfluxDB) Send

func (idb *InfluxDB) Send(c *skogul.Container) error

Send data to Influx, re-using idb.client.

type Log

type Log struct {
	Message string `doc:"Message to print."`
}

Log sender simply executes log.Print() on a predefined message.

Intended use is in combination with other senders, e.g. to explain WHY sender.Debug() was used.

func (Log) Send

func (lg Log) Send(c *skogul.Container) error

Send logs a message and does no further processing

type MQTT

type MQTT struct {
	Address string `doc:"URL-encoded address." example:"mqtt://user:password@server/topic"`
	// contains filtered or unexported fields
}

MQTT Sender publishes messages on a MQTT message bus.

FIXME: The MQTT-sender and receiver should be updated to not use the url-encoded scheme.

func (*MQTT) Send

func (handler *MQTT) Send(c *skogul.Container) error

Send publishes the container in skogul JSON-encoded format on an MQTT topic.

type MnR

type MnR struct {
	Address      string `doc:"Address to send data to" example:"192.168.1.99:1234"`
	DefaultGroup string `doc:"Default group to use if the metadatafield group is missing."`
}

MnR sender writes to M&R port collector.

The output format is:

<timestamp>\t<groupname>\t<variable>\t<value>(\t<property>=<value>)*

Example:

1199145600 group myDevice.Variable1 100 device=myDevice name=MyVariable1

Two special metadata fields can be provided: "group" will set the M&R storage group, and "prefix" will be used to prefix all individual data variables.

E.g:

{
    "template": {
	    "timestamp": "2019-03-15T11:08:02+01:00",
	    "metadata": {
		"server": "somewhere.example.com"
	    }
    },
    "metrics": [
	{
	    "metadata": {
		"prefix": "myDevice.",
		"key": "value",
		"paramkey": "paramvalue"
	    },
	    "data": {
		"astring": "text",
		"float": 1.11,
		"integer": 5
	    }
	}
    ]
}

Will result in:

1552644482	group	myDevice.astring	text		key=value	paramkey=paramvalue	server=somewhere.example.com
1552644482	group	myDevice.float	1.11		key=value	paramkey=paramvalue	server=somewhere.example.com
1552644482	group	myDevice.integer	5		key=value	paramkey=paramvalue	server=somewhere.example.com

The default group is set to that of MnR DefaultGroup. If this is unset, the default group is "group". Meaning:

- If metadata provides "group" key, this is used - Otherwise, if DefaultGroup is set in MnR sender, this is used - Otherwise, "group" is used.

func (*MnR) Send

func (mnr *MnR) Send(c *skogul.Container) error

Send to MnR.

Implementation details: We need to write each value as its own variable to MnR, so we start by constructing two buffers for what comes before and after the key\tvalue, then iterate over m.Data.

Also, we open a new TCP connection for each call to Send() at the moment, which is really suboptimal for large quantities of data, but ok for occasional data dumps. If large metric containers are received, the cost will be negligible. But this should, of course, be fixed in the future.

type Net

type Net struct {
	Address string `doc:"Address to send data to" example:"192.168.1.99:1234"`
	Network string `doc:"Network, according to net.Dial. Typically udp or tcp."`
}

Net sends metrics to a network address

func (*Net) Send

func (n *Net) Send(c *skogul.Container) error

Send sends metrics to a network address, json-encoded

type Null

type Null struct{}

Null sender does nothing and returns nil - mainly for test-purposes

func (*Null) Send

func (n *Null) Send(c *skogul.Container) error

Send just returns nil

type SQL

type SQL struct {
	ConnStr string `` /* 233-byte string literal not displayed */
	Query   string `` /* 708-byte string literal not displayed */
	Driver  string `doc:"Database driver/system. Currently suported: mysql and postgres."`
	// contains filtered or unexported fields
}

SQL sender connects to a SQL Database, currently either MySQL(or Mariadb I suppose) or Postgres. The Connection String for MySQL is specified at https://github.com/go-sql-driver/mysql/ and postgres at http://www.postgresql.org/docs/current/static/libpq-connect.html#LIBPQ-CONNSTRING .

The query is expanded using os.Expand() and will fill in timestamp/metadata/data. The sender will prep the query and essentially covert INSERT INTO foo VLAUES(${timestamp},${metadata.foo},${someData}) to foo("INSERT INTO foo VALUES(?,?,?)", timestamp, foo, someData), so they will be sensibly escaped.

func (*SQL) Send

func (sq *SQL) Send(c *skogul.Container) error

Send will send to the database, after first ensuring the connection is OK.

func (*SQL) Verify

func (sq *SQL) Verify() error

Verify ensures options are set, but currently doesn't check very well, since it is disallowed from connecting to a database and such.

type Sender

type Sender struct {
	Name    string
	Aliases []string
	Alloc   func() skogul.Sender
	Help    string
}

Sender provides a framework that all sender-implementations should follow, and allows auto-initialization.

type Sleeper

type Sleeper struct {
	Next     skogul.SenderRef `doc:"Sender that will receive delayed metrics"`
	MaxDelay skogul.Duration  `doc:"The maximum delay we will suffer"`
	Base     skogul.Duration  `doc:"The baseline - or minimum - delay"`
	Verbose  bool             `doc:"If set to true, will log delay durations"`
}

The Sleeper sender injects a random delay between Base and Base+MaxDelay before passing execution over to the Next sender.

The purpose is testing.

func (*Sleeper) Send

func (sl *Sleeper) Send(c *skogul.Container) error

Send sleeps a random duration according to Sleeper spec, then passes the data to the next sender.

type Test

type Test struct {
	// contains filtered or unexported fields
}

Test sender is used to facilitate tests, and discards any metrics, but increments the Received counter.

func (*Test) Received

func (rcv *Test) Received() uint64

Received returns the amount of containers received

func (*Test) Send

func (rcv *Test) Send(c *skogul.Container) error

Send discards data and increments the Received counter

func (*Test) Set

func (rcv *Test) Set(v uint64)

Set atomicly sets the received counter to v

func (*Test) TestNegative

func (rcv *Test) TestNegative(t failer, s skogul.Sender, c *skogul.Container)

TestNegative sends data on s and expects to fail.

func (*Test) TestQuick

func (rcv *Test) TestQuick(t failer, sender skogul.Sender, c *skogul.Container, received uint64)

TestQuick sends data on the sender and waits 5 milliseconds before checking that the data was received on the other end.

func (*Test) TestTime

func (rcv *Test) TestTime(t failer, s skogul.Sender, c *skogul.Container, received uint64, delay time.Duration)

TestTime sends the container on the specified sender, waits delay period of time, then verifies that rcv has received the expected number of containers.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL