MQTT to Database

When you work on any IoT project, MQTT is one of the best protocols to use, for several reasons starting small size data, work with low-speed networks, and simple to implement on low resources hardware. more details need new article đŸ˜‰

Now the question where and how can I store all the data collected from my sensors 🤔🤔

Lest start with one thing, there is different types of databases out in the world, select one of them, for me “Mongo DB” was my selection for this for several reasons, the most important I don’t need to care about data structure in the MQTT message, which will reduce my conserns and issues on the long run from one side, and it turned out it added few issues in different place.

Now just to clarify we have selected the follwoing:

  • Linux Server running Debian 8/9
  • Mongo DB
  • Mosquitto MQTT Broker
  • & NodeJS 🧐🧐

Just to be honest, I’m using NodeJS in this article and that was my first trial for this, after trying it for a while I did not like NodeJS in general, it is your choice what to use the concept is the same, for us we moved to use DotNet Core on Linux after this.

There is one main concept you can depend on to help with this which is MQTT wildcard subscription, to understand it beter cont below.

Wildcard

When a client subscribes to a topic, it can subscribe to the exact topic of a published message or it can use wildcards to subscribe to multiple topics simultaneously. A wildcard can only be used to subscribe to topics, not to publish a message. There are two different kinds of wildcards: _single-level and _multi-level.

Single Level: +

As the name suggests, a single-level wildcard replaces one topic level. The plus symbol represents a single-level wildcard in a topic.

/cctronic/devices/+/health

Any topic matches a topic with a single-level wildcard if it contains an arbitrary string instead of the wildcard. For example, a subscription to /cctronic/devices/+/health can produce the following results:

will receive this:
/cctronic/devices/smt3214/health
/cctronic/devices/smt4532/health

and will not receive this:
/cctronic/devices/smt3214/events
/cctronic/devices/smt4532/alarm/event

and so on, only one level will be wildcard, and the rest parents and subtopic will be fixed.

Multi Level: #

The multi-level wildcard covers many topic levels. The hash symbol represents the multi-level wild card in the topic. For the broker to determine which topics match, the multi-level wildcard must be placed as the last character in the topic and preceded by a forward slash.

/cctronic/devices/#

now any topic matching the first two levels of the subscription topic will be received.

will receive this:
/cctronic/devices/smt3412/events
/cctronic/devices/smt3412/health
/cctronic/devices/smt5672/events
/cctronic/devices/smt5672/health

and will not​ receive this:
/cctronic/nodes/dev123/reading
/cctronic/edge/ed490/health

When a client subscribes to a topic with a multi-level wildcard, it receives all messages of a topic that begins with the pattern before the wildcard character, no matter how long or deep the topic is. If you specify only the multi-level wildcard as a topic (_#), you receive all messages that are sent to the MQTT broker. If you expect high throughput, subscription with a multi-level wildcard alone is an anti-pattern.

Let go to the best thing the code.

First under home directory in linux create new folder

$ mkdir MqttMongo

then go to that folder

$ cd MqttMongo

Create new file let name it mqttdb.js

$ nano mqttdb.js

Then add the following code to the file

#!/usr/bin/env node

var mqtt=require('mqtt')
var mongodb=require('mongodb');
var mqtt=require('mqtt')
var mongodb=require('mongodb');
var mongodbClient=mongodb.MongoClient;
var mongodbURI='mongodb://dbuser:dbpass@dbhost:27017/dbname'
var deviceRoot="/devices/#"
var deviceID=""
var collection,client;

mongodbClient.connect(mongodbURI,setupCollection);
var HOST = 'mqtt://mqttuser:mqttpass@mqtthost:8883'

function setupCollection(err,db) {
  if(err) throw err;
  collection=db.collection("deviceEvents");
  client=mqtt.connect(HOST);
  client.subscribe(deviceRoot)
  client.on('message', insertEvent);
}

function insertEvent(topic,payload) {
  var key=topic.split("/")[2];
  console.log(key);
  var textPayload = payload.toString('utf8');
  console.log(textPayload);
  var jsonPayload = JSON.parse(textPayload);
  jsonPayload.when=new Date();
  var jsonStore = JSON.stringify(jsonPayload);
  collection.update(
          { _id:key },
          { $push:{events : jsonStore}},
          { upsert:true },
          function(err,docs) {
                if(err) { console.log("Insert fail"); } // Improve error handling
                }
  )
}

The nice thing about this code it will do two main things, create a new entry for each new device that connects to MQTT broker, it will not care about the MQTT payload structure, and all data will be separated by device ID, which will help when extracting the data for web display.

Note: if you face problem or see any bugs I’m ready to help or edit the code. again i’m not a nodejs expert.

Leave a Reply

Please log in using one of these methods to post your comment:

WordPress.com Logo

You are commenting using your WordPress.com account. Log Out /  Change )

Facebook photo

You are commenting using your Facebook account. Log Out /  Change )

Connecting to %s

This site uses Akismet to reduce spam. Learn how your comment data is processed.