Scenario introduction
This scenario requires storing qualified messages under EMQ X specified topics into the MySQL database. In order to facilitate subsequent analysis and retrieval, message content needs to be split and stored.
In this scenario, the device side reports the following information:
- Subject: cmd/state/:id, in which ID represents the vehicle client identification code
-
Message body:
{ "id": "NXP-058659730253-963945118132721-22", // Client Identification Code "speed": 32.12, // Vehicle speed "direction": 198.33212, // Driving direction "tachometer": 3211, // Engine speed, value greater than 8000 need to be stored "dynamical": 8.93, // Instantaneous fuel consumption "location": { // GPS longitude and latitude data "lng": 116.296011, "lat": 40.005091 }, "ts": 1563268202 // Reporting time }
When the reported data engine speed value is greater than 8000, the current information is stored for subsequent analysis of user vehicle usage.
Dead work
Create a database
Create an iot_data database to store message data, specifying that the database is coded utf8mb4 to avoid encoding problems:
CREATE DATABASE `emqx_rule_engine_output` CHARACTER SET utf8mb4;
Create data tables
According to the scenario requirements, create the data table use_statistics structure and field annotations as follows:
CREATE TABLE `use_statistics` ( `id` int(11) NOT NULL AUTO_INCREMENT, `client_id` varchar(100) DEFAULT NULL COMMENT 'Client Identification Code', `speed` float unsigned DEFAULT '0.00' COMMENT 'Current Speed', `tachometer` int(11) unsigned DEFAULT '0' COMMENT 'engine speed', `ts` int(11) unsigned DEFAULT '0' COMMENT 'Report timestamp', `msg_id` varchar(50) DEFAULT NULL COMMENT 'MQTT news ID', PRIMARY KEY (`id`), KEY `client_id_index` (`client_id`) ) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4;
After successful creation, confirm the existence of the data table through the MySQL command:
Database changed mysql> desc use_statistics; +------------+------------------+------+-----+---------+----------------+ | Field | Type | Null | Key | Default | Extra | +------------+------------------+------+-----+---------+----------------+ | id | int(11) | NO | PRI | NULL | auto_increment | | client_id | varchar(100) | YES | MUL | NULL | | | speed | float unsigned | YES | | 0 | | | tachometer | int(11) unsigned | YES | | 0 | | | ts | int(11) unsigned | YES | | 0 | | | msg_id | varchar(50) | YES | | NULL | | +------------+------------------+------+-----+---------+----------------+ 6 rows in set (0.01 sec)
Configuration instructions
Create resources
Open EMQ X Dashboard, go to the resource page of the left menu, click on the new button, type MySQL server information for resource creation.
The network environment of nodes in EMQ X cluster may be different from each other. Click on the status button in the list after the resource creation is successful to check the resource connection status of each node. If the resources on the node are not available, please check the configuration and network connectivity, and click the reconnection button to reconnect manually.
Create rules
Go to the rule page of the left menu and click the new button to create the rule. Here, trigger event message publishing is selected to trigger the rule for data processing when the message is published.
After selecting the trigger event, we can see the optional fields and sample SQL on the interface:
Filter required fields
The rule engine uses SQL statements to process rule conditions. In this business, we need to select all fields in payload separately, use payload. field Name format to select them, and also need topic, qos, id information in message context. Current SQL is as follows:
SELECT payload.id as client_id, payload.speed as speed, payload.tachometer as tachometer, payload.ts as ts, id FROM "message.publish" WHERE topic =~ 't/#'
Establishing Screening Conditions
We need to define two conditions for conditional filtering using the WHERE sentence of the SQL statement.
- Only cmd/state/:id topics are processed, and topic is filtered by using the theme wildcard=~topic: ~'cmd/state/+'
- Processing only tachometer > 8000 messages, using comparator to filter tachometer: payload. tachometer > 8000
Combine the previous step to get the following SQL:
SELECT payload.id as client_id, payload.speed as speed, payload.tachometer as tachometer, payload.ts as ts, id FROM "message.publish" WHERE topic =~ 'cmd/state/+' AND payload.tachometer > 8000
Output Testing Using SQL Testing Function
With the help of the SQL testing function, we can view the data output after the current SQL processing in real time, which requires us to specify payload and other simulation raw data.
Payloload data is as follows, pay attention to changing the tachometer value size to meet the SQL conditions:
{ "id": "NXP-058659730253-963945118132721-22", "speed": 32.12, "direction": 198.33212, "tachometer": 9001, "dynamical": 8.93, "location": { "lng": 116.296011, "lat": 40.005091 }, "ts": 1563268202 }
Click the SQL test switch button, change top and payload as information in the scene, and click the test button to view the data output:
The test output data are as follows:
{ "client_id": "NXP-058659730253-963945118132721-22", "id": "589A429E9572FB44B0000057C0001", "speed": 32.12, "tachometer": 9001, "ts": 1563268202 }
The test output is in line with expectations, and we can proceed with the next steps.
Add response actions to store messages to MySQL
After the input and output of the SQL condition are correct, we continue to add the corresponding actions, configure to write the SQL statement, and store the filtering results in MySQL.
Click on the Add button in the response action, choose to save the data to MySQL action, select the resources just selected, we use the ${field Name} grammar to fill in the SQL statement, insert the data into the database, and finally click the new button to complete the rule creation.
The action's SQL configuration is as follows:
INSERT INTO `use_statistics` (`client_id`, `speed`, `tachometer`, `ts`, `msg_id`) VALUES (${client_id}, ${speed}, ${tachometer}, ${ts}, ${id});
test
Expected results
We have successfully created a rule that contains a processing action with the desired effect as follows:
- The device reports to the cmd/state/:id topic that when the tachometer value in the message exceeds 8000, it will hit SQL, and the hit number in the rule list will be increased by 1;
- The use_statistics table of MySQL iot_data database will add a data value that is consistent with the current message.
Testing with the Websocket tool in Dashboard
Switch to Tool - > Websocket page and connect to EMQ X using any information client. After successful connection, send the following message on the message card:
- Theme: cmd/state/NXP-058659730253-963945118132721-22
-
Message body:
{ "id": "NXP-058659730253-963945118132721-22", "speed": 32.12, "direction": 198.33212, "tachometer": 9002, "dynamical": 8.93, "location": { "lng": 116.296011, "lat": 40.005091 }, "ts": 1563268202 }
Click on the Send button and see the rule statistics hit data statistic value is 1 to indicate that the rule has been successfully hit. The MySQL command line to view the data table records to get the data as follows:
So far, we have realized business development by using rule engine to store messages to MySQL database.
For more information, please visit our official website emqx.io Or visit github.com/emqx/emqx Pay close attention to our open source project, please visit the detailed documentation. Official Documents.