Getting MQTT Data into InfluxDB

Forwarding Data to InfluxDB via MQTT and Telegraf. One of the most common things I find myself doing when collecting and analysing time series data from the wide range of sensors I have deployed in various fields, sometimes literally, is having them populate our time series database of choice, InfluxDB.

MQTT InFlux DB

Usually I’m working on something like an AWS EC2 instance, running Ubuntu, so that’s the configuration I’m going to assume for the rest of this article.


Pushing Data to InfluxDB


In the past, particularly for testing, I’ve usually found myself pushing data via the InfluxDB API, using curl from the command line. This has some advantages in that you can selectively send individual lines pretty easily, but requires that multiple lines be batched up - primarily for performance reasons.

Here’s an example of posting a single datapoint (from within a bash script), I’m assuming you have the $HOST, $BUCKET, and $TOKEN environment variables set up appropriately,


curl -XPOST "http://Nyarlathotep.local:9999/api/v2/write?org=$ORG&bucket=$BUCKET" \
     --header   "Authorization: Token $TOKEN" \
     --data-raw "name,tag1=194,tag2=5755 field1=7900.8 1593523543000000000"

Batching up multiple lines is as simple as supplying them in the —data-raw parameter with a carriage return (\n) between each line of Line Protocol.

The downside here is that either I’m running a script like this in real time to process local data (usually as text files) or I’m having to call it from time to time to process data as it accrues beyond a certain point.

This necessarily leads to the need to have some kind of shell based solution for batching up Line Protocol into a series of curl posts, whilst being careful as to not go over the line limit for bash. Yes, it’s ok for testing, no it’s not convenient for large amounts of data as I have to worry about line limit and it’s not really a ‘fire and forget’ solution long term.

As I use MQTT elsewhere within our own infrastructure for moving data around en masse when guaranteed delivery, speed, and ultimately (for me, at least) piece of mind is important I thought that would be an ideal method to use instead of fiddling around posting with a combination of bash scripts and curl.

Here’s my desired setup,

Image

Initially I was put off by the imagined extra work involved with setting up an MQTT server and then as to how to ingest MQTT data at the InfluxDB end, but, of course, it was nowhere near as difficult as I originally had imagined.


Installing and Configuring an MQTT Server


To keep things very simple, especially for testing and research, I installed an MQTT server on the same machine as I had influxDB installed.


Installing an MQTT server locally is as simple as,


sudo apt-get install mosquitto

If you need some of the local testing tools giving you the ability to post to and watch incoming messages on topics then go ahead and install the clients too. They’re very handy.


sudo apt-get install mosquitto-clients

And that’s it, fundamentally, a simple MQTT installation requires no further configuration. It can be stopped and started in the usual way using systemctl,


sudo systemctl stop mosquitto
sudo systemctl start mosquitto

Installing and Configuring a Local Telegraf Instance


Getting the data into an InfluxDB bucket at the InfluxDB end is also pretty trivial if you’re willing to make the small step into setting up and configuring Telegraf. Previously I hadn’t had the need to use Telegraf and was also put off by the seemingly complex setup procedures but, you know where I’m going, I needn’t have worried.

Although the range of Telegraf plugins offered for importing, exporting, and generally processing data is huge (and extremely handy when you get into it) the bare bones installation and setup needed to ingest data from MQTT into an InfluxDB bucket is really quite simple.

Telegraf slots in like this,

Image

I installed Telegraf by downloading the latest version from influxdata.com,


wget https://dl.influxdata.com/telegraf/releases/telegraf_1.18.3-1_amd64.deb
And then installed it just like a package,

sudo dpkg -i telegraf_1.18.3-1_amd64.deb
As with MQTT above, it too can be stopped and started in the usual way using systemctl,

sudo systemctl stop telegraf
sudo systemctl start telegraf

But, you’ll need to do some configuration here. Telegraf itself is extremely configurable, but can be a little daunting if you’re coming at it from a standing start. Indeed, Influx’s UI (when you’re logged in via a browser) comes with a wonderfully helpful way of configuring Telegraf (see the Data Tab on the left, then click on the Sources tab directly under “Load Data”).

For now, we’ll start with a blank configuration file and put in the bare minimum to get what we want. As we’ll just want Telegraf to filter incoming MQTT messages and drop them into specific buckets (and nothing more complicated, I’ll maybe get into that in a later article) we’ll just insert some “outputs” sections into telegraf.conf.

Now have the basics, let’s do a short worked example.


Worked Example - Collecting Sensor Data into Influx Buckets


Keeping things extremely simple, let’s say we have a bunch of sensors, one set producing some kind of wind data, and the other producing some kind of water data.

What we’d like is to separate the incoming data into two buckets, created in InfluxDB’s UI, called “Wind” and “Water”. Nothing special, and we’ll be using Line Protocol (rather than JSON) for the payload format in MQTT just to make it clear - though whichever is pretty unimportant at this level.

Here’s an overview of the system schematic,

Image

To discriminate between the sensor types I’m going to, say, use the measurement parameter of the Line Protocol to designate the sensor type. So, a typical incoming Line Protocol payload would look something like,


water,id=100 level=57.6 1556813561098000000

wind,id=227  speed=33.2 1556813561112000000

Again, keeping things really simple. I could have kept them in the same bucket using a tag for sensor type, but maybe in the future I’ll have lots of type so need to keep the tag cardinality small - who knows - it’s just an example.

We’ll need to fill in several important pieces of configuration to get everything working,


Short and Sweet Headlines are Best!

  • The sensors need to know the location of the MQTT server and a Topic to post in
  • MQTT is fine knowing nothing for this kind of example

Telegraf needs to know
(in its configuration file, telegraf.conf)


  • The location of the MQTT server
  • The location of InfluxDB
  • A token from InfluxDB giving it permission to post to Influx’s buckets
  • Organisation name for InfluxDB
  • Bucket name(s) for InfluxDB
  • Bucket name(s) for InfluxDB

Posting to an MQTT server is a trivial exercise and will depend on how your sensors are set up - usually I have something running on a Linux variant on custom hardware so the MQTT Python client is a common goto for this.

Let’s post to, and retrieve from, a topic called “/test/sensordata” so sensors will post to this topic and telegraf.conf will listed to it.

Here’s the first part telegraf.conf.

(I turn off the hostname the data is coming from so it’s not inserted into InfluxDB as a tag automatically)

[agent]
omit_hostname = true

[[inputs.mqtt_consumer]]
servers = ["tcp://localhost:1883"]
topics = [ "/test/sensordata" ]
data_format = "influx"
topic_tag = ""

You can see that I’ve specified the format as “influx” (Line Protocol) and that MQTT is running on  localhost, i.e. the host where Telegraf is running.

Next, we need to filter incoming messages, for that we include some “outputs” sections in telegraf.conf.

[[outputs.influxdb_v2]]
urls = ["http://localhost:8086"]
token = "PASTE YOUR TOKEN IN HERE"
organization = "ResourceKraft"  
namepass = ["wind"]
bucket = "wind"
  
[[outputs.influxdb_v2]]
urls = ["http://localhost:8086"]
token = "PASTE YOUR TOKEN IN HERE"
organization = "ResourceKraft"  
namepass = ["water"]
bucket = "water"

Here we have two sections, one for each bucket and the destination (urls) of InfluxDB is set again to localhost (with its particular port). The token is the one you need to get from InfluxDB to give Telegraf permission to post to it. Organisation has to be specified also.

What’s most interested are the namepass parameters - one for each section - each one will only let messages through who’s measurement Line Protocol parameter match the string in quotes. Thus, for namepass [“water”] only those with measurement corresponding to “water” will be passed onto the following line where bucket is specified as “water”. Same for “wind”.

And that’s pretty much it.

From the sensor everything with measurement “wind” ends up in the “wind” bucket in InfluxDB, and everything with the measurement “water” ends up in the “water” bucket.

There’s an awful lot more than can be done here, and this really is the most simplistic case, as decisions can be made on topics (using regular expressions in telegraf.conf, usually), multiple strings, and much, much, more. But, I hope this gets you started and encourages you to use Telegraf in combination with InfluxDB and MQTT to get your data flowing quickly and easily!


Disclaimer: Jumping up and Down Security and Performance


Some of you may be aghast at me installing things on the same machine and being a bit lax with both security and hardcoding some values, but hey, I’m just playing around with ideas and researching future projects most of the time and not putting together a production system.

You can, of course, run each and every one of these applications on separate machines, encrypt your traffic, and keep your URLS, TOKENS, and ORGANIZATION in environment variables (as you should do). 

Do (please) take the time to look into this if you’re setting up in the wild as opposed to local secure machines and networks.

Contact us