mqtt_consumer

package
Version: v1.23.3 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jul 25, 2022 License: MIT Imports: 15 Imported by: 6

README

MQTT Consumer Input Plugin

The MQTT consumer plugin reads from the specified MQTT topics and creates metrics using one of the supported input data formats.

Configuration

# Read metrics from MQTT topic(s)
[[inputs.mqtt_consumer]]
  ## Broker URLs for the MQTT server or cluster.  To connect to multiple
  ## clusters or standalone servers, use a separate plugin instance.
  ##   example: servers = ["tcp://localhost:1883"]
  ##            servers = ["ssl://localhost:1883"]
  ##            servers = ["ws://localhost:1883"]
  servers = ["tcp://127.0.0.1:1883"]

  ## Topics that will be subscribed to.
  topics = [
    "telegraf/host01/cpu",
    "telegraf/+/mem",
    "sensors/#",
  ]

  ## The message topic will be stored in a tag specified by this value.  If set
  ## to the empty string no topic tag will be created.
  # topic_tag = "topic"

  ## QoS policy for messages
  ##   0 = at most once
  ##   1 = at least once
  ##   2 = exactly once
  ##
  ## When using a QoS of 1 or 2, you should enable persistent_session to allow
  ## resuming unacknowledged messages.
  # qos = 0

  ## Connection timeout for initial connection in seconds
  # connection_timeout = "30s"

  ## Maximum messages to read from the broker that have not been written by an
  ## output.  For best throughput set based on the number of metrics within
  ## each message and the size of the output's metric_batch_size.
  ##
  ## For example, if each message from the queue contains 10 metrics and the
  ## output metric_batch_size is 1000, setting this to 100 will ensure that a
  ## full batch is collected and the write is triggered immediately without
  ## waiting until the next flush_interval.
  # max_undelivered_messages = 1000

  ## Persistent session disables clearing of the client session on connection.
  ## In order for this option to work you must also set client_id to identify
  ## the client.  To receive messages that arrived while the client is offline,
  ## also set the qos option to 1 or 2 and don't forget to also set the QoS when
  ## publishing.
  # persistent_session = false

  ## If unset, a random client ID will be generated.
  # client_id = ""

  ## Username and password to connect MQTT server.
  # username = "telegraf"
  # password = "metricsmetricsmetricsmetrics"

  ## Optional TLS Config
  # tls_ca = "/etc/telegraf/ca.pem"
  # tls_cert = "/etc/telegraf/cert.pem"
  # tls_key = "/etc/telegraf/key.pem"
  ## Use TLS but skip chain & host verification
  # insecure_skip_verify = false

  ## Data format to consume.
  ## Each data format has its own unique set of configuration options, read
  ## more about them here:
  ## https://github.com/influxdata/telegraf/blob/master/docs/DATA_FORMATS_INPUT.md
  data_format = "influx"

  ## Enable extracting tag values from MQTT topics
  ## _ denotes an ignored entry in the topic path
  # [[inputs.mqtt_consumer.topic_parsing]]
  #   topic = ""
  #   measurement = ""
  #   tags = ""
  #   fields = ""
  ## Value supported is int, float, unit
  #   [[inputs.mqtt_consumer.topic.types]]
  #      key = type

Example Output

mqtt_consumer,host=pop-os,topic=telegraf/host01/cpu value=45i 1653579140440951943
mqtt_consumer,host=pop-os,topic=telegraf/host01/cpu value=100i 1653579153147395661

About Topic Parsing

The MQTT topic as a whole is stored as a tag, but this can be far too coarse to be easily used when utilizing the data further down the line. This change allows tag values to be extracted from the MQTT topic letting you store the information provided in the topic in a meaningful way. An _ denotes an ignored entry in the topic path. Please see the following example.

Topic Parsing Example
[[inputs.mqtt_consumer]]
  ## Broker URLs for the MQTT server or cluster.  To connect to multiple
  ## clusters or standalone servers, use a separate plugin instance.
  ##   example: servers = ["tcp://localhost:1883"]
  ##            servers = ["ssl://localhost:1883"]
  ##            servers = ["ws://localhost:1883"]
  servers = ["tcp://127.0.0.1:1883"]

  ## Topics that will be subscribed to.
  topics = [
    "telegraf/+/cpu/23",
  ]

  ## Data format to consume.
  ## Each data format has its own unique set of configuration options, read
  ## more about them here:
  ## https://github.com/influxdata/telegraf/blob/master/docs/DATA_FORMATS_INPUT.md
  data_format = "value"
  data_type = "float"

  [[inputs.mqtt_consumer.topic_parsing]]
    topic = "telegraf/one/cpu/23"
    measurement = "_/_/measurement/_"
    tags = "tag/_/_/_"
    fields = "_/_/_/test"
    [inputs.mqtt_consumer.topic_parsing.types]
      test = "int"

Will result in the following metric:

cpu,host=pop-os,tag=telegraf,topic=telegraf/one/cpu/23 value=45,test=23i 1637014942460689291

Field Pivoting Example

You can use the pivot processor to rotate single valued metrics into a multi field metric. For more info check out the pivot processors here.

For this example these are the topics:

/sensors/CLE/v1/device5/temp
/sensors/CLE/v1/device5/rpm
/sensors/CLE/v1/device5/ph
/sensors/CLE/v1/device5/spin

And these are the metrics:

sensors,site=CLE,version=v1,device_name=device5,field=temp value=390
sensors,site=CLE,version=v1,device_name=device5,field=rpm value=45.0
sensors,site=CLE,version=v1,device_name=device5,field=ph value=1.45

Using pivot in the config will rotate the metrics into a multi field metric. The config:

[[inputs.mqtt_consumer]]
    ....
    topics = "/sensors/#"
    [[inputs.mqtt_consumer.topic_parsing]]
        measurement = "/measurement/_/_/_/_"
        tags = "/_/site/version/device_name/field"
[[processors.pivot]]
    tag_key = "field"
    value_key = "value"

Will result in the following metric:

sensors,site=CLE,version=v1,device_name=device5 temp=390,rpm=45.0,ph=1.45

Metrics

  • All measurements are tagged with the incoming topic, ie topic=telegraf/host01/cpu

  • example when [[inputs.mqtt_consumer.topic_parsing]] is set

Documentation

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

This section is empty.

Types

type Client

type Client interface {
	Connect() mqtt.Token
	SubscribeMultiple(filters map[string]byte, callback mqtt.MessageHandler) mqtt.Token
	AddRoute(topic string, callback mqtt.MessageHandler)
	Disconnect(quiesce uint)
}

type ClientFactory

type ClientFactory func(o *mqtt.ClientOptions) Client

type ConnectionState

type ConnectionState int
const (
	Disconnected ConnectionState = iota
	Connecting
	Connected
)

type MQTTConsumer

type MQTTConsumer struct {
	Servers                []string             `toml:"servers"`
	Topics                 []string             `toml:"topics"`
	TopicTag               *string              `toml:"topic_tag"`
	TopicParsing           []TopicParsingConfig `toml:"topic_parsing"`
	Username               string               `toml:"username"`
	Password               string               `toml:"password"`
	QoS                    int                  `toml:"qos"`
	ConnectionTimeout      config.Duration      `toml:"connection_timeout"`
	MaxUndeliveredMessages int                  `toml:"max_undelivered_messages"`

	MetricBuffer      int `toml:"metric_buffer" deprecated:"0.10.3;2.0.0;option is ignored"`
	PersistentSession bool
	ClientID          string `toml:"client_id"`

	tls.ClientConfig

	Log telegraf.Logger
	// contains filtered or unexported fields
}

func New

func New(factory ClientFactory) *MQTTConsumer

func (*MQTTConsumer) Gather

func (m *MQTTConsumer) Gather(_ telegraf.Accumulator) error

func (*MQTTConsumer) Init

func (m *MQTTConsumer) Init() error

func (*MQTTConsumer) SampleConfig

func (*MQTTConsumer) SampleConfig() string

func (*MQTTConsumer) SetParser

func (m *MQTTConsumer) SetParser(parser parsers.Parser)

func (*MQTTConsumer) Start

func (m *MQTTConsumer) Start(acc telegraf.Accumulator) error

func (*MQTTConsumer) Stop

func (m *MQTTConsumer) Stop()

type TopicParsingConfig added in v1.21.0

type TopicParsingConfig struct {
	Topic       string            `toml:"topic"`
	Measurement string            `toml:"measurement"`
	Tags        string            `toml:"tags"`
	Fields      string            `toml:"fields"`
	FieldTypes  map[string]string `toml:"types"`
	// cached split of user given information
	MeasurementIndex int
	SplitTags        []string
	SplitFields      []string
	SplitTopic       []string
}

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL