Published: Nov 17, 2021 License: MIT


RabbitMQ Input Plugin

Reads metrics from RabbitMQ servers via the Management Plugin.

For additional details reference the RabbitMQ Management HTTP Stats.

  ## Management Plugin url. (default: http://localhost:15672)
  # url = "http://localhost:15672"
  ## Tag added to rabbitmq_overview series; deprecated: use tags
  # name = "rmq-server-1"
  ## Credentials
  # username = "guest"
  # password = "guest"

  ## Optional TLS Config
  # tls_ca = "/etc/telegraf/ca.pem"
  # tls_cert = "/etc/telegraf/cert.pem"
  # tls_key = "/etc/telegraf/key.pem"
  ## Use TLS but skip chain & host verification
  # insecure_skip_verify = false

  ## Optional request timeouts
  ## ResponseHeaderTimeout, if non-zero, specifies the amount of time to wait
  ## for a server's response headers after fully writing the request.
  # header_timeout = "3s"
  ## client_timeout specifies a time limit for requests made by this client.
  ## Includes connection time, any redirects, and reading the response body.
  # client_timeout = "4s"

  ## A list of nodes to gather as the rabbitmq_node measurement. If not
  ## specified, metrics for all nodes are gathered.
  # nodes = ["rabbit@node1", "rabbit@node2"]

  ## A list of queues to gather as the rabbitmq_queue measurement. If not
  ## specified, metrics for all queues are gathered.
  # queues = ["telegraf"]

  ## A list of exchanges to gather as the rabbitmq_exchange measurement. If not
  ## specified, metrics for all exchanges are gathered.
  # exchanges = ["telegraf"]

  ## Metrics to include and exclude. Globs accepted.
  ## Note that an empty array for both will include all metrics
  ## Currently the following metrics are supported: "exchange", "federation", "node", "overview", "queue"
  # metric_include = []
  # metric_exclude = []

  ## Queues to include and exclude. Globs accepted.
  ## Note that an empty array for both will include all queues
  # queue_name_include = []
  # queue_name_exclude = []

  ## Federation upstreams to include and exclude specified as an array of glob
  ## pattern strings.  Federation links can also be limited by the queue and
  ## exchange filters.
  # federation_upstream_include = []
  # federation_upstream_exclude = []
  • rabbitmq_overview
    • tags:
      • url
      • name
    • fields:
      • channels (int, channels)
      • connections (int, connections)
      • consumers (int, consumers)
      • exchanges (int, exchanges)
      • messages (int, messages)
      • messages_acked (int, messages)
      • messages_delivered (int, messages)
      • messages_delivered_get (int, messages)
      • messages_published (int, messages)
      • messages_ready (int, messages)
      • messages_unacked (int, messages)
      • queues (int, queues)
      • clustering_listeners (int, cluster nodes)
      • amqp_listeners (int, amqp nodes up)
      • return_unroutable (int, number of unroutable messages)
      • return_unroutable_rate (float, number of unroutable messages per second)
  • rabbitmq_node
    • tags:
      • url
      • node
    • fields:
      • disk_free (int, bytes)
      • disk_free_limit (int, bytes)
      • disk_free_alarm (int, disk alarm)
      • fd_total (int, file descriptors)
      • fd_used (int, file descriptors)
      • mem_limit (int, bytes)
      • mem_used (int, bytes)
      • mem_alarm (int, memory a)
      • proc_total (int, erlang processes)
      • proc_used (int, erlang processes)
      • run_queue (int, erlang processes)
      • sockets_total (int, sockets)
      • sockets_used (int, sockets)
      • running (int, node up)
      • uptime (int, milliseconds)
      • mnesia_disk_tx_count (int, number of disk transaction)
      • mnesia_ram_tx_count (int, number of ram transaction)
      • mnesia_disk_tx_count_rate (float, number of disk transaction per second)
      • mnesia_ram_tx_count_rate (float, number of ram transaction per second)
      • gc_num (int, number of garbage collection)
      • gc_bytes_reclaimed (int, bytes)
      • gc_num_rate (float, number of garbage collection per second)
      • gc_bytes_reclaimed_rate (float, bytes per second)
      • io_read_avg_time (float, number of read operations)
      • io_read_avg_time_rate (int, number of read operations per second)
      • io_read_bytes (int, bytes)
      • io_read_bytes_rate (float, bytes per second)
      • io_write_avg_time (int, milliseconds)
      • io_write_avg_time_rate (float, milliseconds per second)
      • io_write_bytes (int, bytes)
      • io_write_bytes_rate (float, bytes per second)
      • mem_connection_readers (int, bytes)
      • mem_connection_writers (int, bytes)
      • mem_connection_channels (int, bytes)
      • mem_connection_other (int, bytes)
      • mem_queue_procs (int, bytes)
      • mem_queue_slave_procs (int, bytes)
      • mem_plugins (int, bytes)
      • mem_other_proc (int, bytes)
      • mem_metrics (int, bytes)
      • mem_mgmt_db (int, bytes)
      • mem_mnesia (int, bytes)
      • mem_other_ets (int, bytes)
      • mem_binary (int, bytes)
      • mem_msg_index (int, bytes)
      • mem_code (int, bytes)
      • mem_atom (int, bytes)
      • mem_other_system (int, bytes)
      • mem_allocated_unused (int, bytes)
      • mem_reserved_unallocated (int, bytes)
      • mem_total (int, bytes)
  • rabbitmq_queue
    • tags:
      • url
      • queue
      • vhost
      • node
      • durable
      • auto_delete
    • fields:
      • consumer_utilisation (float, percent)
      • consumers (int, int)
      • idle_since (string, time - e.g., "2006-01-02 15:04:05")
      • memory (int, bytes)
      • message_bytes (int, bytes)
      • message_bytes_persist (int, bytes)
      • message_bytes_ram (int, bytes)
      • message_bytes_ready (int, bytes)
      • message_bytes_unacked (int, bytes)
      • messages (int, count)
      • messages_ack (int, count)
      • messages_ack_rate (float, messages per second)
      • messages_deliver (int, count)
      • messages_deliver_rate (float, messages per second)
      • messages_deliver_get (int, count)
      • messages_deliver_get_rate (float, messages per second)
      • messages_publish (int, count)
      • messages_publish_rate (float, messages per second)
      • messages_ready (int, count)
      • messages_redeliver (int, count)
      • messages_redeliver_rate (float, messages per second)
      • messages_unack (int, count)
      • slave_nodes (int, count)
      • synchronised_slave_nodes (int, count)
  • rabbitmq_exchange
    • tags:
      • url
      • exchange
      • type
      • vhost
      • internal
      • durable
      • auto_delete
    • fields:
      • messages_publish_in (int, count)
      • messages_publish_in_rate (int, messages per second)
      • messages_publish_out (int, count)
      • messages_publish_out_rate (int, messages per second)
  • rabbitmq_federation
    • tags:
      • url
      • vhost
      • type
      • upstream
      • exchange
      • upstream_exchange
      • queue
      • upstream_queue
    • fields:
      • acks_uncommitted (int, count)
      • consumers (int, count)
      • messages_unacknowledged (int, count)
      • messages_uncommitted (int, count)
      • messages_unconfirmed (int, count)
      • messages_confirm (int, count)
      • messages_publish (int, count)
      • messages_return_unroutable (int, count)
Sample Queries

Message rates for the entire node can be calculated from total message counts. For instance, to get the rate of messages published per minute, use this query:

SELECT NON_NEGATIVE_DERIVATIVE(LAST("messages_published"), 1m) AS messages_published_rate FROM rabbitmq_overview WHERE time > now() - 10m GROUP BY time(1m)
Example Output
rabbitmq_queue,url=http://amqp.example.org:15672,queue=telegraf,vhost=influxdb,node=rabbit@amqp.example.org,durable=true,auto_delete=false,host=amqp.example.org messages_deliver_get=0i,messages_publish=329i,messages_publish_rate=0.2,messages_redeliver_rate=0,message_bytes_ready=0i,message_bytes_unacked=0i,messages_deliver=329i,messages_unack=0i,consumers=1i,idle_since="",messages=0i,messages_deliver_rate=0.2,messages_deliver_get_rate=0.2,messages_redeliver=0i,memory=43032i,message_bytes_ram=0i,messages_ack=329i,messages_ready=0i,messages_ack_rate=0.2,consumer_utilisation=1,message_bytes=0i,message_bytes_persist=0i 1493684035000000000
rabbitmq_overview,url=http://amqp.example.org:15672,host=amqp.example.org channels=2i,consumers=1i,exchanges=17i,messages_acked=329i,messages=0i,messages_ready=0i,messages_unacked=0i,connections=2i,queues=1i,messages_delivered=329i,messages_published=329i,clustering_listeners=2i,amqp_listeners=1i 1493684035000000000
rabbitmq_node,url=http://amqp.example.org:15672,node=rabbit@amqp.example.org,host=amqp.example.org fd_total=1024i,fd_used=32i,mem_limit=8363329126i,sockets_total=829i,disk_free=8175935488i,disk_free_limit=50000000i,mem_used=58771080i,proc_total=1048576i,proc_used=267i,run_queue=0i,sockets_used=2i,running=1i 149368403500000000
rabbitmq_exchange,url=http://amqp.example.org:15672,exchange=telegraf,type=fanout,vhost=influxdb,internal=false,durable=true,auto_delete=false,host=amqp.example.org messages_publish_in=2i,messages_publish_out=1i 149368403500000000




View Source
const DefaultClientTimeout = 4
View Source
const DefaultPassword = "guest"

DefaultPassword will set a default value that corresponds to the default value used by Rabbitmq

View Source
const DefaultResponseHeaderTimeout = 3

Default http timeouts

View Source
const DefaultURL = "http://localhost:15672"

DefaultURL will set a default value that corresponds to the default value used by Rabbitmq

View Source
const DefaultUsername = "guest"

DefaultUsername will set a default value that corresponds to the default value used by Rabbitmq


This section is empty.


This section is empty.


type Details

type Details struct {
	Rate float64 `json:"rate"`

Details ...

type ErrorResponse added in v1.19.1

type ErrorResponse struct {
	Error  string `json:"error"`
	Reason string `json:"reason"`

Error response

type Exchange added in v1.14.0

type Exchange struct {
	Name         string
	MessageStats `json:"message_stats"`
	Type         string
	Internal     bool
	Vhost        string
	Durable      bool
	AutoDelete   bool `json:"auto_delete"`
type FederationLink struct {
	Type             string                `json:"type"`
	Queue            string                `json:"queue"`
	UpstreamQueue    string                `json:"upstream_queue"`
	Exchange         string                `json:"exchange"`
	UpstreamExchange string                `json:"upstream_exchange"`
	Vhost            string                `json:"vhost"`
	Upstream         string                `json:"upstream"`
	LocalChannel     FederationLinkChannel `json:"local_channel"`

FederationLink ...

type FederationLinkChannel added in v1.14.0

type FederationLinkChannel struct {
	AcksUncommitted        int64                             `json:"acks_uncommitted"`
	ConsumerCount          int64                             `json:"consumer_count"`
	MessagesUnacknowledged int64                             `json:"messages_unacknowledged"`
	MessagesUncommitted    int64                             `json:"messages_uncommitted"`
	MessagesUnconfirmed    int64                             `json:"messages_unconfirmed"`
	MessageStats           FederationLinkChannelMessageStats `json:"message_stats"`

FederationLinkChannel ...

type FederationLinkChannelMessageStats added in v1.14.0

type FederationLinkChannelMessageStats struct {
	Confirm                 int64   `json:"confirm"`
	ConfirmDetails          Details `json:"confirm_details"`
	Publish                 int64   `json:"publish"`
	PublishDetails          Details `json:"publish_details"`
	ReturnUnroutable        int64   `json:"return_unroutable"`
	ReturnUnroutableDetails Details `json:"return_unroutable_details"`

FederationLinkChannelMessageStats ...

type HealthCheck added in v1.14.0

type HealthCheck struct {
	Status string `json:"status"`

type Listeners added in v1.14.0

type Listeners struct {
	Protocol string `json:"protocol"`

Listeners ...

type Memory added in v1.14.0

type Memory struct {
	ConnectionReaders   int64       `json:"connection_readers"`
	ConnectionWriters   int64       `json:"connection_writers"`
	ConnectionChannels  int64       `json:"connection_channels"`
	ConnectionOther     int64       `json:"connection_other"`
	QueueProcs          int64       `json:"queue_procs"`
	QueueSlaveProcs     int64       `json:"queue_slave_procs"`
	Plugins             int64       `json:"plugins"`
	OtherProc           int64       `json:"other_proc"`
	Metrics             int64       `json:"metrics"`
	MgmtDb              int64       `json:"mgmt_db"`
	Mnesia              int64       `json:"mnesia"`
	OtherEts            int64       `json:"other_ets"`
	Binary              int64       `json:"binary"`
	MsgIndex            int64       `json:"msg_index"`
	Code                int64       `json:"code"`
	Atom                int64       `json:"atom"`
	OtherSystem         int64       `json:"other_system"`
	AllocatedUnused     int64       `json:"allocated_unused"`
	ReservedUnallocated int64       `json:"reserved_unallocated"`
	Total               interface{} `json:"total"`

Memory details

type MemoryResponse added in v1.14.0

type MemoryResponse struct {
	Memory *Memory `json:"memory"`

MemoryResponse ...

type MessageStats

type MessageStats struct {
	Ack                     int64
	AckDetails              Details `json:"ack_details"`
	Deliver                 int64
	DeliverDetails          Details `json:"deliver_details"`
	DeliverGet              int64   `json:"deliver_get"`
	DeliverGetDetails       Details `json:"deliver_get_details"`
	Publish                 int64
	PublishDetails          Details `json:"publish_details"`
	Redeliver               int64
	RedeliverDetails        Details `json:"redeliver_details"`
	PublishIn               int64   `json:"publish_in"`
	PublishInDetails        Details `json:"publish_in_details"`
	PublishOut              int64   `json:"publish_out"`
	PublishOutDetails       Details `json:"publish_out_details"`
	ReturnUnroutable        int64   `json:"return_unroutable"`
	ReturnUnroutableDetails Details `json:"return_unroutable_details"`

MessageStats ...

type Node

type Node struct {
	Name string

	DiskFree                 int64   `json:"disk_free"`
	DiskFreeLimit            int64   `json:"disk_free_limit"`
	DiskFreeAlarm            bool    `json:"disk_free_alarm"`
	FdTotal                  int64   `json:"fd_total"`
	FdUsed                   int64   `json:"fd_used"`
	MemLimit                 int64   `json:"mem_limit"`
	MemUsed                  int64   `json:"mem_used"`
	MemAlarm                 bool    `json:"mem_alarm"`
	ProcTotal                int64   `json:"proc_total"`
	ProcUsed                 int64   `json:"proc_used"`
	RunQueue                 int64   `json:"run_queue"`
	SocketsTotal             int64   `json:"sockets_total"`
	SocketsUsed              int64   `json:"sockets_used"`
	Running                  bool    `json:"running"`
	Uptime                   int64   `json:"uptime"`
	MnesiaDiskTxCount        int64   `json:"mnesia_disk_tx_count"`
	MnesiaDiskTxCountDetails Details `json:"mnesia_disk_tx_count_details"`
	MnesiaRAMTxCount         int64   `json:"mnesia_ram_tx_count"`
	MnesiaRAMTxCountDetails  Details `json:"mnesia_ram_tx_count_details"`
	GcNum                    int64   `json:"gc_num"`
	GcNumDetails             Details `json:"gc_num_details"`
	GcBytesReclaimed         int64   `json:"gc_bytes_reclaimed"`
	GcBytesReclaimedDetails  Details `json:"gc_bytes_reclaimed_details"`
	IoReadAvgTime            float64 `json:"io_read_avg_time"`
	IoReadAvgTimeDetails     Details `json:"io_read_avg_time_details"`
	IoReadBytes              int64   `json:"io_read_bytes"`
	IoReadBytesDetails       Details `json:"io_read_bytes_details"`
	IoWriteAvgTime           float64 `json:"io_write_avg_time"`
	IoWriteAvgTimeDetails    Details `json:"io_write_avg_time_details"`
	IoWriteBytes             int64   `json:"io_write_bytes"`
	IoWriteBytesDetails      Details `json:"io_write_bytes_details"`

Node ...

type ObjectTotals

type ObjectTotals struct {
	Channels    int64
	Connections int64
	Consumers   int64
	Exchanges   int64
	Queues      int64

ObjectTotals ...

type OverviewResponse

type OverviewResponse struct {
	MessageStats *MessageStats `json:"message_stats"`
	ObjectTotals *ObjectTotals `json:"object_totals"`
	QueueTotals  *QueueTotals  `json:"queue_totals"`
	Listeners    []Listeners   `json:"listeners"`

OverviewResponse ...

type Queue

type Queue struct {
	QueueTotals            // just to not repeat the same code
	MessageStats           `json:"message_stats"`
	Memory                 int64
	Consumers              int64
	ConsumerUtilisation    float64 `json:"consumer_utilisation"`
	Name                   string
	Node                   string
	Vhost                  string
	Durable                bool
	AutoDelete             bool     `json:"auto_delete"`
	IdleSince              string   `json:"idle_since"`
	SlaveNodes             []string `json:"slave_nodes"`
	SynchronisedSlaveNodes []string `json:"synchronised_slave_nodes"`

Queue ...

type QueueTotals

type QueueTotals struct {
	Messages                   int64
	MessagesReady              int64 `json:"messages_ready"`
	MessagesUnacknowledged     int64 `json:"messages_unacknowledged"`
	MessageBytes               int64 `json:"message_bytes"`
	MessageBytesReady          int64 `json:"message_bytes_ready"`
	MessageBytesUnacknowledged int64 `json:"message_bytes_unacknowledged"`
	MessageRAM                 int64 `json:"message_bytes_ram"`
	MessagePersistent          int64 `json:"message_bytes_persistent"`

QueueTotals ...

type RabbitMQ

type RabbitMQ struct {
	URL      string `toml:"url"`
	Name     string `toml:"name"`
	Username string `toml:"username"`
	Password string `toml:"password"`

	ResponseHeaderTimeout config.Duration `toml:"header_timeout"`
	ClientTimeout         config.Duration `toml:"client_timeout"`

	Nodes     []string `toml:"nodes"`
	Queues    []string `toml:"queues"`
	Exchanges []string `toml:"exchanges"`

	MetricInclude             []string `toml:"metric_include"`
	MetricExclude             []string `toml:"metric_exclude"`
	QueueInclude              []string `toml:"queue_name_include"`
	QueueExclude              []string `toml:"queue_name_exclude"`
	FederationUpstreamInclude []string `toml:"federation_upstream_include"`
	FederationUpstreamExclude []string `toml:"federation_upstream_exclude"`

	Log telegraf.Logger `toml:"-"`
	// contains filtered or unexported fields

RabbitMQ defines the configuration necessary for gathering metrics, see the sample config for further details

func (*RabbitMQ) Description

func (r *RabbitMQ) Description() string

Description ...

func (*RabbitMQ) Gather

func (r *RabbitMQ) Gather(acc telegraf.Accumulator) error

Gather ...

func (*RabbitMQ) Init added in v1.19.1

func (r *RabbitMQ) Init() error

func (*RabbitMQ) SampleConfig

func (r *RabbitMQ) SampleConfig() string

SampleConfig ...

Source Files

