2023年7月10日大约 3 分钟
MQTT 协议
MQTT 是机器对机器(M2M)/“物联网”连接协议。
它被设计为一种非常轻量级的发布/订阅消息传递。
对于与需要较小代码占用和/或网络带宽非常宝贵的远程位置的连接很有用。
IoTDB 支持 MQTT v3.1(OASIS 标准)协议。
IoTDB 服务器包括内置的 MQTT 服务,该服务允许远程设备将消息直接发送到 IoTDB 服务器。
内置 MQTT 服务
内置的 MQTT 服务提供了通过 MQTT 直接连接到 IoTDB 的能力。 它侦听来自 MQTT 客户端的发布消息,然后立即将数据写入存储。
MQTT 主题与 IoTDB 时间序列相对应。
消息有效载荷可以由 Java SPI 加载的PayloadFormatter
格式化为事件,默认实现为JSONPayloadFormatter
默认的json
格式化程序支持两种 json 格式,以下是 MQTT 消息有效负载示例:
{
"device":"root.sg.d1",
"timestamp":1586076045524,
"measurements":["s1","s2"],
"values":[0.530635,0.530635]
}
或者
{
"device":"root.sg.d1",
"timestamps":[1586076045524,1586076065526],
"measurements":["s1","s2"],
"values":[[0.530635,0.530635], [0.530655,0.530695]]
}
MQTT 配置
默认情况下,IoTDB MQTT 服务从${IOTDB_HOME}/${IOTDB_CONF}/iotdbengine.properties
加载配置。
配置如下:
名称 | 描述 | 默认 |
---|---|---|
enable_mqtt_service | 是否启用 mqtt 服务 | false |
mqtt_host | mqtt 服务绑定主机 | 0.0.0.0 |
mqtt_port | mqtt 服务绑定端口 | 1883 |
mqtt_handler_pool_size | 处理 mqtt 消息的处理程序池大小 | 1 |
mqtt_payload_formatter | mqtt 消息有效负载格式化程序 | json |
mqtt_max_message_size | mqtt 消息最大长度(字节) | 1048576 |
示例代码
以下是 mqtt 客户端将消息发送到 IoTDB 服务器的示例。
MQTT mqtt = new MQTT();
mqtt.setHost("127.0.0.1", 1883);
mqtt.setUserName("root");
mqtt.setPassword("root");
BlockingConnection connection = mqtt.blockingConnection();
connection.connect();
Random random = new Random();
for (int i = 0; i < 10; i++) {
String payload = String.format("{\n" +
"\"device\":\"root.sg.d1\",\n" +
"\"timestamp\":%d,\n" +
"\"measurements\":[\"s1\"],\n" +
"\"values\":[%f]\n" +
"}", System.currentTimeMillis(), random.nextDouble());
connection.publish("root.sg.d1.s1", payload.getBytes(), QoS.AT_LEAST_ONCE, false);
}
connection.disconnect();
自定义 MQTT 消息格式
事实上可以通过简单编程来实现 MQTT 消息的格式自定义。
可以在源码的 example/mqtt-customize
项目中找到一个简单示例。
步骤:
- 创建一个 Java 项目,增加如下依赖
<dependency>
<groupId>org.apache.iotdb</groupId>
<artifactId>iotdb-server</artifactId>
<version>${project.version}</version>
</dependency>
- 创建一个实现类,实现接口
org.apache.iotdb.db.mqtt.PayloadFormatter.java
package org.apache.iotdb.mqtt.server;
import io.netty.buffer.ByteBuf;
import org.apache.iotdb.db.protocol.mqtt.Message;
import org.apache.iotdb.db.protocol.mqtt.PayloadFormatter;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
public class CustomizedJsonPayloadFormatter implements PayloadFormatter {
@Override
public List<Message> format(ByteBuf payload) {
// Suppose the payload is a json format
if (payload == null) {
return null;
}
String json = payload.toString(StandardCharsets.UTF_8);
// parse data from the json and generate Messages and put them into List<Meesage> ret
List<Message> ret = new ArrayList<>();
// this is just an example, so we just generate some Messages directly
for (int i = 0; i < 2; i++) {
long ts = i;
Message message = new Message();
message.setDevice("d" + i);
message.setTimestamp(ts);
message.setMeasurements(Arrays.asList("s1", "s2"));
message.setValues(Arrays.asList("4.0" + i, "5.0" + i));
ret.add(message);
}
return ret;
}
@Override
public String getName() {
// set the value of mqtt_payload_formatter in iotdb-engine.properties as the following string:
return "CustomizedJson";
}
}
- 修改项目中的
src/main/resources/META-INF/services/org.apache.iotdb.db.mqtt.PayloadFormatter
文件:
将示例中的文件内容清除,并将刚才的实现类的全名(包名.类名)写入文件中。注意,这个文件中只有一行。
在本例中,文件内容为:org.apache.iotdb.mqtt.server.CustomizedJsonPayloadFormatter
- 编译项目生成一个 jar 包:
mvn package -DskipTests
在 IoTDB 服务端:
- 将刚才的 jar 包放入 IoTDB 的 lib 文件夹内
- 打开 MQTT 服务参数. (
enable_mqtt_service=true
inconf/iotdb-engine.properties
) - 用刚才的实现类中的 getName() 方法的返回值 设置为
conf/iotdb-engine.properties
中mqtt_payload_formatter
的值,
, 在本例中,为CustomizedJson
- 启动 IoTDB
- 搞定.
More: MQTT 协议的消息不限于 json,你还可以用任意二进制。通过如下函数获得:payload.forEachByte()
or payload.array
。