MQTT 协议
2023年7月10日大约 3 分钟
MQTT 协议
MQTT 是机器对机器(M2M)/“物联网”连接协议。
它被设计为一种非常轻量级的发布/订阅消息传递。
对于与需要较小代码占用和/或网络带宽非常宝贵的远程位置的连接很有用。
IoTDB 支持 MQTT v3.1(OASIS 标准)协议。
IoTDB 服务器包括内置的 MQTT 服务,该服务允许远程设备将消息直接发送到 IoTDB 服务器。
![](https://alioss.timecho.com/docs/img/github/78357432-0c71cf80-75e4-11ea-98aa-c43a54d469ce.png)
内置 MQTT 服务
内置的 MQTT 服务提供了通过 MQTT 直接连接到 IoTDB 的能力。 它侦听来自 MQTT 客户端的发布消息,然后立即将数据写入存储。
MQTT 主题与 IoTDB 时间序列相对应。
消息有效载荷可以由 Java SPI 加载的PayloadFormatter
格式化为事件,默认实现为JSONPayloadFormatter
默认的json
格式化程序支持两种 json 格式以及由他们组成的json数组,以下是 MQTT 消息有效负载示例:
{
"device":"root.sg.d1",
"timestamp":1586076045524,
"measurements":["s1","s2"],
"values":[0.530635,0.530635]
}
或者
{
"device":"root.sg.d1",
"timestamps":[1586076045524,1586076065526],
"measurements":["s1","s2"],
"values":[[0.530635,0.530635], [0.530655,0.530695]]
}
或者以上两者的JSON数组形式。
![](https://alioss.timecho.com/docs/img/github/78357469-1bf11880-75e4-11ea-978f-a53996667a0d.png)
MQTT 配置
默认情况下,IoTDB MQTT 服务从${IOTDB_HOME}/${IOTDB_CONF}/iotdb-system.properties
加载配置。
配置如下:
名称 | 描述 | 默认 |
---|---|---|
enable_mqtt_service | 是否启用 mqtt 服务 | false |
mqtt_host | mqtt 服务绑定主机 | 127.0.0.1 |
mqtt_port | mqtt 服务绑定端口 | 1883 |
mqtt_handler_pool_size | 处理 mqtt 消息的处理程序池大小 | 1 |
mqtt_payload_formatter | mqtt 消息有效负载格式化程序 | json |
mqtt_max_message_size | mqtt 消息最大长度(字节) | 1048576 |
## 示例代码
以下是 mqtt 客户端将消息发送到 IoTDB 服务器的示例。
MQTT mqtt = new MQTT();
mqtt.setHost("127.0.0.1", 1883);
mqtt.setUserName("root");
mqtt.setPassword("root");
BlockingConnection connection = mqtt.blockingConnection();
connection.connect();
Random random = new Random();
for (int i = 0; i < 10; i++) {
String payload = String.format("{\n" +
"\"device\":\"root.sg.d1\",\n" +
"\"timestamp\":%d,\n" +
"\"measurements\":[\"s1\"],\n" +
"\"values\":[%f]\n" +
"}", System.currentTimeMillis(), random.nextDouble());
connection.publish("root.sg.d1.s1", payload.getBytes(), QoS.AT_LEAST_ONCE, false);
}
connection.disconnect();
自定义 MQTT 消息格式
事实上可以通过简单编程来实现 MQTT 消息的格式自定义。
可以在源码的 example/mqtt-customize
项目中找到一个简单示例。
步骤:
- 创建一个 Java 项目,增加如下依赖
<dependency>
<groupId>org.apache.iotdb</groupId>
<artifactId>iotdb-server</artifactId>
<version>1.3.0-SNAPSHOT</version>
</dependency>
- 创建一个实现类,实现接口
org.apache.iotdb.db.mqtt.protocol.PayloadFormatter
package org.apache.iotdb.mqtt.server;
import io.netty.buffer.ByteBuf;
import org.apache.iotdb.db.protocol.mqtt.Message;
import org.apache.iotdb.db.protocol.mqtt.PayloadFormatter;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
public class CustomizedJsonPayloadFormatter implements PayloadFormatter {
@Override
public List<Message> format(ByteBuf payload) {
// Suppose the payload is a json format
if (payload == null) {
return null;
}
String json = payload.toString(StandardCharsets.UTF_8);
// parse data from the json and generate Messages and put them into List<Meesage> ret
List<Message> ret = new ArrayList<>();
// this is just an example, so we just generate some Messages directly
for (int i = 0; i < 2; i++) {
long ts = i;
Message message = new Message();
message.setDevice("d" + i);
message.setTimestamp(ts);
message.setMeasurements(Arrays.asList("s1", "s2"));
message.setValues(Arrays.asList("4.0" + i, "5.0" + i));
ret.add(message);
}
return ret;
}
@Override
public String getName() {
// set the value of mqtt_payload_formatter in iotdb-system.properties as the following string:
return "CustomizedJson";
}
}
- 修改项目中的
src/main/resources/META-INF/services/org.apache.iotdb.db.protocol.mqtt.PayloadFormatter
文件:
将示例中的文件内容清除,并将刚才的实现类的全名(包名.类名)写入文件中。注意,这个文件中只有一行。
在本例中,文件内容为:org.apache.iotdb.mqtt.server.CustomizedJsonPayloadFormatter
- 编译项目生成一个 jar 包:
mvn package -DskipTests
在 IoTDB 服务端:
- 创建 ${IOTDB_HOME}/ext/mqtt/ 文件夹, 将刚才的 jar 包放入此文件夹。
- 打开 MQTT 服务参数. (
enable_mqtt_service=true
inconf/iotdb-system.properties
) - 用刚才的实现类中的 getName() 方法的返回值 设置为
conf/iotdb-system.properties
中mqtt_payload_formatter
的值,
, 在本例中,为CustomizedJson
- 启动 IoTDB
- 搞定
More: MQTT 协议的消息不限于 json,你还可以用任意二进制。通过如下函数获得:payload.forEachByte()
or payload.array
。