EMQ X rule engine series stores messages to MySQL database

Scenario introduction

This scenario requires storing qualified messages under EMQ X specified topics into the MySQL database. In order to facilitate subsequent analysis and retrieval, message content needs to be split and stored.

In this scenario, the device side reports the following information:

  • Subject: cmd/state/:id, in which ID represents the vehicle client identification code
  • Message body:

    {
      "id": "NXP-058659730253-963945118132721-22", // Client Identification Code
      "speed": 32.12, // Vehicle speed
      "direction": 198.33212, // Driving direction
      "tachometer": 3211, // Engine speed, value greater than 8000 need to be stored
      "dynamical": 8.93, // Instantaneous fuel consumption
      "location": { // GPS longitude and latitude data
        "lng": 116.296011,
        "lat": 40.005091
      },
      "ts": 1563268202 // Reporting time
    }

When the reported data engine speed value is greater than 8000, the current information is stored for subsequent analysis of user vehicle usage.

Dead work

Create a database

Create an iot_data database to store message data, specifying that the database is coded utf8mb4 to avoid encoding problems:

CREATE DATABASE `emqx_rule_engine_output` CHARACTER SET utf8mb4;

Create data tables

According to the scenario requirements, create the data table use_statistics structure and field annotations as follows:

CREATE TABLE `use_statistics` (
  `id` int(11) NOT NULL AUTO_INCREMENT,
  `client_id` varchar(100) DEFAULT NULL COMMENT 'Client Identification Code',
  `speed` float unsigned DEFAULT '0.00' COMMENT 'Current Speed',
  `tachometer` int(11) unsigned DEFAULT '0' COMMENT 'engine speed',
  `ts` int(11) unsigned DEFAULT '0' COMMENT 'Report timestamp',
  `msg_id` varchar(50) DEFAULT NULL COMMENT 'MQTT news ID',
  PRIMARY KEY (`id`),
  KEY `client_id_index` (`client_id`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4;

After successful creation, confirm the existence of the data table through the MySQL command:

Database changed
mysql> desc use_statistics;
+------------+------------------+------+-----+---------+----------------+
| Field      | Type             | Null | Key | Default | Extra          |
+------------+------------------+------+-----+---------+----------------+
| id         | int(11)          | NO   | PRI | NULL    | auto_increment |
| client_id  | varchar(100)     | YES  | MUL | NULL    |                |
| speed      | float unsigned   | YES  |     | 0       |                |
| tachometer | int(11) unsigned | YES  |     | 0       |                |
| ts         | int(11) unsigned | YES  |     | 0       |                |
| msg_id     | varchar(50)      | YES  |     | NULL    |                |
+------------+------------------+------+-----+---------+----------------+
6 rows in set (0.01 sec)

Configuration instructions

Create resources

Open EMQ X Dashboard, go to the resource page of the left menu, click on the new button, type MySQL server information for resource creation.

The network environment of nodes in EMQ X cluster may be different from each other. Click on the status button in the list after the resource creation is successful to check the resource connection status of each node. If the resources on the node are not available, please check the configuration and network connectivity, and click the reconnection button to reconnect manually.

Create rules

Go to the rule page of the left menu and click the new button to create the rule. Here, trigger event message publishing is selected to trigger the rule for data processing when the message is published.

After selecting the trigger event, we can see the optional fields and sample SQL on the interface:

Filter required fields

The rule engine uses SQL statements to process rule conditions. In this business, we need to select all fields in payload separately, use payload. field Name format to select them, and also need topic, qos, id information in message context. Current SQL is as follows:

SELECT
  payload.id as client_id, payload.speed as speed, 
  payload.tachometer as tachometer,
  payload.ts as ts, id
FROM
  "message.publish"
WHERE
  topic =~ 't/#'

Establishing Screening Conditions

We need to define two conditions for conditional filtering using the WHERE sentence of the SQL statement.

  • Only cmd/state/:id topics are processed, and topic is filtered by using the theme wildcard=~topic: ~'cmd/state/+'
  • Processing only tachometer > 8000 messages, using comparator to filter tachometer: payload. tachometer > 8000

Combine the previous step to get the following SQL:

SELECT
  payload.id as client_id, payload.speed as speed, 
  payload.tachometer as tachometer,
  payload.ts as ts,
  id
FROM
  "message.publish"
WHERE
  topic =~ 'cmd/state/+'
  AND payload.tachometer > 8000

Output Testing Using SQL Testing Function

With the help of the SQL testing function, we can view the data output after the current SQL processing in real time, which requires us to specify payload and other simulation raw data.

Payloload data is as follows, pay attention to changing the tachometer value size to meet the SQL conditions:

{
  "id": "NXP-058659730253-963945118132721-22",
  "speed": 32.12,
  "direction": 198.33212,
  "tachometer": 9001,
  "dynamical": 8.93,
  "location": {
    "lng": 116.296011,
    "lat": 40.005091
  },
  "ts": 1563268202
}

Click the SQL test switch button, change top and payload as information in the scene, and click the test button to view the data output:

The test output data are as follows:

{
  "client_id": "NXP-058659730253-963945118132721-22",
  "id": "589A429E9572FB44B0000057C0001",
  "speed": 32.12,
  "tachometer": 9001,
  "ts": 1563268202
}

The test output is in line with expectations, and we can proceed with the next steps.

Add response actions to store messages to MySQL

After the input and output of the SQL condition are correct, we continue to add the corresponding actions, configure to write the SQL statement, and store the filtering results in MySQL.

Click on the Add button in the response action, choose to save the data to MySQL action, select the resources just selected, we use the ${field Name} grammar to fill in the SQL statement, insert the data into the database, and finally click the new button to complete the rule creation.

The action's SQL configuration is as follows:

INSERT INTO 
    `use_statistics` (`client_id`, `speed`, `tachometer`, `ts`, `msg_id`)
VALUES 
    (${client_id}, ${speed}, ${tachometer}, ${ts}, ${id});

test

Expected results

We have successfully created a rule that contains a processing action with the desired effect as follows:

  1. The device reports to the cmd/state/:id topic that when the tachometer value in the message exceeds 8000, it will hit SQL, and the hit number in the rule list will be increased by 1;
  2. The use_statistics table of MySQL iot_data database will add a data value that is consistent with the current message.

Testing with the Websocket tool in Dashboard

Switch to Tool - > Websocket page and connect to EMQ X using any information client. After successful connection, send the following message on the message card:

  • Theme: cmd/state/NXP-058659730253-963945118132721-22
  • Message body:

    {
      "id": "NXP-058659730253-963945118132721-22",
      "speed": 32.12,
      "direction": 198.33212,
      "tachometer": 9002,
      "dynamical": 8.93,
      "location": {
        "lng": 116.296011,
        "lat": 40.005091
      },
      "ts": 1563268202
    }

Click on the Send button and see the rule statistics hit data statistic value is 1 to indicate that the rule has been successfully hit. The MySQL command line to view the data table records to get the data as follows:

So far, we have realized business development by using rule engine to store messages to MySQL database.

For more information, please visit our official website emqx.io Or visit github.com/emqx/emqx Pay close attention to our open source project, please visit the detailed documentation. Official Documents.

Tags: MySQL SQL Database network

Posted on Wed, 31 Jul 2019 04:50:41 -0700 by Drace