跳至主要內容

大约 3 分钟

MQTT 协议

MQTTopen in new window 是机器对机器(M2M)/“物联网”连接协议。

它被设计为一种非常轻量级的发布/订阅消息传递。

对于与需要较小代码占用和/或网络带宽非常宝贵的远程位置的连接很有用。

IoTDB 支持 MQTT v3.1(OASIS 标准)协议。
IoTDB 服务器包括内置的 MQTT 服务,该服务允许远程设备将消息直接发送到 IoTDB 服务器。

内置 MQTT 服务

内置的 MQTT 服务提供了通过 MQTT 直接连接到 IoTDB 的能力。 它侦听来自 MQTT 客户端的发布消息,然后立即将数据写入存储。
MQTT 主题与 IoTDB 时间序列相对应。
消息有效载荷可以由 Java SPI 加载的PayloadFormatter格式化为事件,默认实现为JSONPayloadFormatter
默认的json格式化程序支持两种 json 格式以及由他们组成的json数组,以下是 MQTT 消息有效负载示例:

 {
      "device":"root.sg.d1",
      "timestamp":1586076045524,
      "measurements":["s1","s2"],
      "values":[0.530635,0.530635]
 }

或者

 {
      "device":"root.sg.d1",
      "timestamps":[1586076045524,1586076065526],
      "measurements":["s1","s2"],
      "values":[[0.530635,0.530635], [0.530655,0.530695]]
 }

或者以上两者的JSON数组形式。

MQTT 配置

默认情况下,IoTDB MQTT 服务从${IOTDB_HOME}/${IOTDB_CONF}/iotdb-common.properties加载配置。

配置如下:

名称描述默认
enable_mqtt_service是否启用 mqtt 服务false
mqtt_hostmqtt 服务绑定主机127.0.0.1
mqtt_portmqtt 服务绑定端口1883
mqtt_handler_pool_size处理 mqtt 消息的处理程序池大小1
mqtt_payload_formattermqtt 消息有效负载格式化程序json
mqtt_max_message_sizemqtt 消息最大长度(字节)1048576

### 示例代码
以下是 mqtt 客户端将消息发送到 IoTDB 服务器的示例。

MQTT mqtt = new MQTT();
mqtt.setHost("127.0.0.1", 1883);
mqtt.setUserName("root");
mqtt.setPassword("root");

BlockingConnection connection = mqtt.blockingConnection();
connection.connect();

Random random = new Random();
for (int i = 0; i < 10; i++) {
   String payload = String.format("{\n" +
           "\"device\":\"root.sg.d1\",\n" +
           "\"timestamp\":%d,\n" +
           "\"measurements\":[\"s1\"],\n" +
           "\"values\":[%f]\n" +
           "}", System.currentTimeMillis(), random.nextDouble());

   connection.publish("root.sg.d1.s1", payload.getBytes(), QoS.AT_LEAST_ONCE, false);
}

connection.disconnect();

自定义 MQTT 消息格式

事实上可以通过简单编程来实现 MQTT 消息的格式自定义。
可以在源码的 example/mqtt-customize 项目中找到一个简单示例。

步骤:

  • 创建一个 Java 项目,增加如下依赖
        <dependency>
            <groupId>org.apache.iotdb</groupId>
            <artifactId>iotdb-server</artifactId>
            <version>${project.version}</version>
        </dependency>
  • 创建一个实现类,实现接口 org.apache.iotdb.db.mqtt.protocol.PayloadFormatter
package org.apache.iotdb.mqtt.server;

import io.netty.buffer.ByteBuf;
import org.apache.iotdb.db.protocol.mqtt.Message;
import org.apache.iotdb.db.protocol.mqtt.PayloadFormatter;

import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;

public class CustomizedJsonPayloadFormatter implements PayloadFormatter {

    @Override
    public List<Message> format(ByteBuf payload) {
        // Suppose the payload is a json format
        if (payload == null) {
            return null;
        }

        String json = payload.toString(StandardCharsets.UTF_8);
        // parse data from the json and generate Messages and put them into List<Meesage> ret
        List<Message> ret = new ArrayList<>();
        // this is just an example, so we just generate some Messages directly
        for (int i = 0; i < 2; i++) {
            long ts = i;
            Message message = new Message();
            message.setDevice("d" + i);
            message.setTimestamp(ts);
            message.setMeasurements(Arrays.asList("s1", "s2"));
            message.setValues(Arrays.asList("4.0" + i, "5.0" + i));
            ret.add(message);
        }
        return ret;
    }

    @Override
    public String getName() {
        // set the value of mqtt_payload_formatter in iotdb-datanode.properties as the following string:
        return "CustomizedJson";
    }
}
  • 修改项目中的 src/main/resources/META-INF/services/org.apache.iotdb.db.protocol.mqtt.PayloadFormatter 文件:
    将示例中的文件内容清除,并将刚才的实现类的全名(包名.类名)写入文件中。注意,这个文件中只有一行。
    在本例中,文件内容为: org.apache.iotdb.mqtt.server.CustomizedJsonPayloadFormatter
  • 编译项目生成一个 jar 包: mvn package -DskipTests

在 IoTDB 服务端:

  • 创建 ${IOTDB_HOME}/ext/mqtt/ 文件夹, 将刚才的 jar 包放入此文件夹。
  • 打开 MQTT 服务参数. (enable_mqtt_service=true in conf/iotdb-datanode.properties)
  • 用刚才的实现类中的 getName() 方法的返回值 设置为 conf/iotdb-datanode.propertiesmqtt_payload_formatter 的值,
    , 在本例中,为 CustomizedJson
  • 启动 IoTDB
  • 搞定.

More: MQTT 协议的消息不限于 json,你还可以用任意二进制。通过如下函数获得:
payload.forEachByte() or payload.array

Copyright © 2024 The Apache Software Foundation.
Apache and the Apache feather logo are trademarks of The Apache Software Foundation

Have a question? Connect with us on QQ, WeChat, or Slack. Join the community now.