IoT - MQTT Groovy subscriber script for updating sensor data into SQL database

Author: Jan Willem Teunisse, 27 February 2016 (edited)

Introduction

This article describes a Groovy script as an example how to subcribe to a MQTT message broker and update the received sensor data into a SQL database. The Groovy script will be a major part in a personal Internet of Things (IoT) domotica project for building a custom domotica system.

The domotica system will be based on several Raspberry Pi's in order to monitor my 'smart' Dutch energy meter, to gather several sensor data for temperature/humidity and to control an opentherm central heating system and a sunscreen.

In this IoT project an Oracle 11i XE database is used running on a Windows10 Shuttle micro desktop. For the MQTT broker we use the Windows version of the open source mosquitto broker.

For the MQTT subscriber I have chosen for the Groovy language using the Eclipse/Paho Java JAR's. During my research I have found a number of publishing examples using the Java language, but only a few for the subscriber part. Especially the part which deals with the interfacing of the received MQTT topics and payload (messages) into a database is not widely documented in examples and/or available scripts on the internet. This was a motivation to publish and share my gained experience with this article.

Reference material for further reading:

  1. HiveMQ, a MQTT provider, has several articles about the MQTT Essentials and articles on examples using various languages like Javascript, Python, etc. See bullet 3) for a Java client article.

  2. Information about the Eclipse/Paho MQTT client library, this article is about the Paho Java client part.

  3. An excellent article by Dominik Obermaier on MQTT Client Library Encyclopedia - Eclipse Paho Java

  4. The open source broker mosquitto website.

  5. An introduction to the Groovy language documentation and for SQL practices see the groovy.sql.SQL Library.


Structure of the Groovy script

The main part of the script is based on the Eclipse/Paho MQTT structure, which connects to the MQTT mosquitto broker, subscribes to the topics and payload messages sent (published) by the IoT sensors in the domotica network. The following body part shows the MQTT Client subscription handling, based on a Paho Java example by HiveMQ

Listing 1. MQTT code block



import org.eclipse.paho.client.mqttv3.IMqttDeliveryToken;
import org.eclipse.paho.client.mqttv3.MqttCallback;
import org.eclipse.paho.client.mqttv3.MqttClient;
import org.eclipse.paho.client.mqttv3.MqttException;
import org.eclipse.paho.client.mqttv3.MqttMessage;
import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence;
import groovy.transform.Field
import groovy.sql.Sql

public abstract class Mqtt2Database implements MqttCallback {
public static void main(String[] args) {
  String mqtt_host = System.getenv("MQTT_BROKER_HOST") ;
  String mqtt_port = System.getenv("MQTT_BROKER_PORT") ;
  String tempDir = System.getenv("TEMP") ;
  int sqlok = 0 ; // result of Sql update
  println "IoT MQTT subcriber - Oracle updates: ";
  println "Environment MQTT Broker Host: $mqtt_host, Port: $mqtt_port";
  println "Temp dir : $tempDir" ;
  String mqtt_url = "tcp://$mqtt_host:$mqtt_port" ;
  println "MQTT Url: $mqtt_url" ;
  println "SQL collect MQTT settings:" 
  try {
    def sql = Sql.newInstance("jdbc:oracle:thin:@192.168.10.22:1521:xe", "<username>", "<password>", "oracle.jdbc.pool.OracleDataSource")
    sql.eachRow("select value as ipadres from micasa.settings where system='mqtt' and name='broker_ip'") {
      println "  MQTT host ${it.ipadres}"
      mqtt_host = it.ipadres
    }
    sql.eachRow("select value as port from micasa.settings where system='mqtt' and name='broker_port'") {
      println "  MQTT port: ${it.port}"
      mqtt_port = it.port
    }
  sql.close() ;
  } catch (Exception e) {
        println "SQL Exception found:" ;
        e.printStackTrace();
  }
  try {
     MqttClient client = new MqttClient(mqtt_url, "micasaSubOraSQL", new MemoryPersistence());
     client.setCallback(new MqttCallback() {
       @Override
       public void connectionLost(Throwable cause) {
       // TODO Auto-generated method stub
       println "MQTT connection lost, cause: $cause";
       }

       @Override
       public void messageArrived(String topic, MqttMessage message) throws Exception {
         String bericht = message.toString() ;   
         println "Topic: $topic - Payload Msg: $bericht " ;
         sqlok = updateDatabase(topic, bericht) ;  // sqlok = 0: no update; sqlok >=1: number of updates, usually 1
         println "Database result: $sqlok " ;
       }

       @Override
       public void deliveryComplete(IMqttDeliveryToken token) {
       // TODO Auto-generated method stub
       println "MQTT delivery complete" ;
       }
     });
     client.connect(); 
     if (!client.isConnected() ) {
          println "No MQTT broker connection" ;
     }       
     client.subscribe("micasa/+/stat/+");
  } catch (MqttException e) {
        println "MQTT Exception found:" ;
        e.printStackTrace();
  }
}
}

The interfacing with the Oracle XE database is based on the groovy.sql.SQL API, see for instance Practically Groovy: JDBC programming with Groovy for more information.

The script starts with retrieving the MQTT Broker IP-address and the Broker Port using Environment Strings like MQTT_BROKER_HOST and MQTT_BROKER_PORT. These two settings are combined into the MQTT URL 'tcp://$mqtt_host:$mqtt_port'.

You will notice that these two settings are also retrieved from the SQL database table settings. This was done in the early stages of development in order to check if the SQL connection was functioning correctly in the Groovy script.

In this example the MQTT client subscribes to the topic with a 2 wildcards, the first relate to the <sensorname> and the second one relates to the <sensortype> like temperature or humidity.
In general the topic looks like micasa/<sensorname>/stat/<sensortype>.
The received payload (a MQTT word) message is in our example defined as <sensortype>=<value>;<ISO-datetimeformat>.

The main class uses two methods, the first method updateDatabase supports the updating of the sensor data into the SQL database. The second method isNumericalString checks if a value is a numerical string or not, this is necessary for generating the correct update SQL statement.

Listing 2. Method updateDatabase code block



public static int updateDatabase(String msgtopic, String msgbericht) {
  int result = 0 ;
  String sensorname = "" ; // name of the sensor derived from the topic
  String sensortype = "" ; // type derived from the payload message
  String oraDateTime = ""; // date time derives from the bericht
  String sqlText = "" ;    // sql update statement
  println "  params: $msgtopic, $msgbericht"
  // parsing topic and payload message (msgbericht)
  def topicList = msgtopic.tokenize('/')
  int lng = topicList.size
  String str1 = topicList[0]
  sensorname =  topicList[1]
  String str3 = topicList[2]
  sensortype = topicList[3]
  lng = msgbericht.length()
  int lastpos = msgbericht.lastIndexOf(';')
  int firstpos = msgbericht.indexOf(';')
  println "  payload/bericht lengte: $lng, first: $firstpos, last: $lastpos"
  String part1 = ""
  if (firstpos > 0) {
    part1 =msgbericht[0..firstpos-1]
    oraDateTime = msgbericht[lastpos+1..lng-1]
    oraDateTime = oraDateTime.replace('T', ' ')
  } else {
    part1 = msgbericht ;
    def today = new Date();
    oraDateTime = String.format('%tY%>', oraDateTime)
  sqlText = sqlText.replaceAll('<>', sensorname)
  sqlText = sqlText.replaceAll('<>', sensortype)
  sqlText = sqlText.replaceAll('<>', strValue)
  sqlText = sqlText.replaceAll('<>', strValueChr)
  // println "  SQL-Text: $sqlText"
  def sql = Sql.newInstance("jdbc:oracle:thin:@192.168.10.22:1521:xe", "<username>", "<password>", "oracle.jdbc.pool.OracleDataSource")
  sql.execute(sqlText);
  result = sql.getUpdateCount()
  sql.close() ; 
  println "  SQL result: $result" 
  return result ;
}

The input parameters of the updateDatabase method are the received topic and the payload message. The topic is tokenized in order to collect the sensorname and the sensortype. The payload message contains the value of the specific sensortype like temperature or humidity. As the second part part of the message contains an optional DateTime stamp of the time that the payload message was published to the MQTT broker. Example: temp=20.5;20160131T16:30
If the DateTime stamp is omitted in the message, the current DateTime is being used.

In the second part of the method the SQL update statement is assembled into the String variable sqlText on the basis of the constant private static final String SQL_UPDATE.
The third part is to established a SQL database connection with the Oracle database, execute the assembled SQL statement, get the number of the updated rows (in our case mostly one row) and close the connection until the next received message from the MQTT broker.

In the database table present_values we store the numerical value in the database field value and also as a string in the field valuechr. If the sensor data is not numerical but contains a string value like 'on/off', 'true/false', etc., the numerical database field value is set to zero (0). Therefore in the above mentioned method we call a simple method to check as a boolean function if the received value is a numerical string. As Groovy String handling does not directly support a method to check if a string is numerical, we use in our case a simple method found in various forums on the internet and adapted it in this example by expanding with a check on a plus (+) or minus (-) sign. See listing 3.

Listing 3. Method isNumericalString code block



public static boolean isNumericString(String input) {
    boolean result = false;
    if(input != null && input.length() > 0) {
        char[] charArray = input.toCharArray();
        for(char c : charArray) {
            if ((c >= '0' && c <= '9') || (c == '.') || (c ==',') || (c == '+') || (c =='-')){
                result = true;  // it is a digit or decimal point
            } else {
                result = false;
                break;
            }
        }
    }
    return result;
}


Back to top

Complete script

You can download the complete mqtt2database.groovy script using this link.

For personal use of the script you have to modify your own settings of the MQTT broker IP-address & port, the IP-address & port, username & password of your SQL database and the table name.

For test runs of the Groovy script I use the groovyConsole.bat or in the Windows commandline console the command %Groovy_home%\bin\groovy D:\Ontwikkelomgeving\groovy\mqtt2database.groovy. After using the Groovy compiler to compile the script into a Java class file, a command file can be called in order to start the Mqtt2Database.class file as is shown in Listing 4.

Listing 4. Running the Groovy script



REM echo off
REM  file run_mqtt2database.cmd
REM  project IoT Domotica
REM  author Jam Willem Teunisse/Version 0.9.1/Date 8 february 2016 
REM   drive is D:
REM   
set GRLIB=%GROOVY_HOME%\lib
set SCRIPTPATH=D:\Ontwikkelomgeving\groovy
D:
CD %SCRIPTPATH%
REM
:again
REM   start JAR mqtt2database 
REM  java -cp ".;%GRLIB%\*.*" -jar mqtt2database
REM
REM Running as class
java -cp ".;%GRLIB%\*" Mqtt2Database
REM
pause
REM start again if script stops due to MQTT connection time out
REM goto again
:stop
echo End-of-job


With the aid of GroovyWrapper you can compile the tested and finished Groovy script into an executable JAR file, but for the moment this is out of scope for this article.

A sample of the output in the standard output console is shown in the code block below. As you can see the first received topic contains the sensorname 'sensors', in the database table there is no such row. And that's why the SQL interface returns a zero result: no update. The second received topic does contain an existing sensorname 'werkkamer' (my home office), with a temperature of 21.5 Celsius. After the SQL update, the result is 1; so one row is updated.

Output sample



IoT MQTT subcriber - Oracle updates: 
Environment MQTT Broker Host: 192.168.10.22, Port: 1883
Temp dir : D:\Temp
MQTT Url: tcp://192.168.10.22:1883
SQL collect MQTT settings:
  MQTT host 192.168.10.22
  MQTT port: 1883
Settings MQTT Broker Host: 192.168.10.22, Port: 1883
MQTT Broker url: tcp://192.168.10.22:1883
Topic: micasa/sensors/stat/temp - Payload Msg: 21.5 
  params: micasa/sensors/stat/temp, 21.5
  payload/bericht lengte: 4, first: -1, last: -1
  SQL result: 0
Database result: 0 
Topic: micasa/werkkamer/stat/temp - Payload Msg: 21.5 
  params: micasa/werkkamer/stat/temp, 21.5
  payload/bericht lengte: 4, first: -1, last: -1
  SQL result: 1
Database result: 1 


Back to top

Summary

The main goal in this article was to focus on a working example by updating a SQL database table using MQTT messages. Instead of a SQL database other means of lightweight storage can be used like a NonSQL or key-value datastores like Redis.

In this example we subscribed to only one type of topic, using extra client.subscribe(another-topic); is possible.

In a production Groovy script we also have to deal with lost-connections, the setting of MQTT options and security issues (SSL/TSL).

Licenses and copyright

Licenses of the used software components.


© Copyright 2016 by J.W. Teunisse
This piece of software as presented in this article is provided 'as-is', without any express or implied warranty. In no event will the author be held liable for any damages arising from the use of this software.

Back to top

Comments or advice

Your comments or advice for improvement are most welcome, you can send them to the following email-address pr@jwteunisse.nl

Back to top