跳至主要內容

数据同步

...大约 11 分钟

数据同步

数据同步是工业物联网的典型需求,通过数据同步机制,可实现IoTDB之间的数据共享,搭建完整的数据链路来满足内网外网数据互通、端边云同步、数据迁移、数据备份等需求。

功能介绍

同步任务概述

一个数据同步任务包含2个阶段:

  • 抽取(Extract)阶段:该部分用于从源 IoTDB 抽取数据,在SQL语句中的 Extractor 部分定义
  • 发送(Connect)阶段:该部分用于向目标 IoTDB 发送数据,在SQL语句中的 Connector 部分定义

通过 SQL 语句声明式地配置2个部分的具体内容,可实现灵活的数据同步能力。

同步任务 - 创建

使用 CREATE PIPE 语句来创建一条数据同步任务,下列属性中PipeIdconnector为必填项,extractorprocessor为选填项,输入SQL时注意 EXTRACTOR CONNECTOR 插件顺序不能替换。

SQL 示例如下:

CREATE PIPE <PipeId> -- PipeId 是能够唯一标定任务任务的名字
-- 数据抽取插件,必填插件
WITH EXTRACTOR (
  [<parameter> = <value>,],
-- 数据连接插件,必填插件
WITH CONNECTOR (
  [<parameter> = <value>,],
)

📌 注:使用数据同步功能,请保证接收端开启自动创建元数据

同步任务 - 管理

数据同步任务有三种状态:RUNNING、STOPPED和DROPPED。任务状态转换如下图所示:

状态迁移图
状态迁移图

一个数据同步任务在生命周期中会经过多种状态:

  • RUNNING: 运行状态。
  • STOPPED: 停止状态。
    • 说明1:任务的初始状态为停止状态,需要使用SQL语句启动任务
    • 说明2:用户也可以使用SQL语句手动将一个处于运行状态的任务停止,此时状态会从 RUNNING 变为 STOPPED
    • 说明3:当一个任务出现无法恢复的错误时,其状态会自动从 RUNNING 变为 STOPPED
  • DROPPED:删除状态。

我们提供以下SQL语句对同步任务进行状态管理。

启动任务

创建之后,任务不会立即被处理,需要启动任务。使用START PIPE语句来启动任务,从而开始处理数据:

START PIPE<PipeId>

停止任务

停止处理数据:

STOP PIPE <PipeId>

删除任务

删除指定任务:

DROP PIPE <PipeId>

删除任务不需要先停止同步任务。

查看任务

查看全部任务:

SHOW PIPES

查看指定任务:

SHOW PIPE <PipeId>

插件

为了使得整体架构更加灵活以匹配不同的同步场景需求,在上述同步任务框架中IoTDB支持进行插件组装。系统为您预置了一些常用插件可直接使用,同时您也可以自定义 connector 插件,并加载至IoTDB系统进行使用。

模块插件预置插件自定义插件
抽取(Extract)Extractor 插件iotdb-extractor不支持
发送(Connect)Connector 插件iotdb-thrift-connector、iotdb-air-gap-connector支持

预置插件

预置插件如下:

插件名称类型介绍适用版本
iotdb-extractorextractor 插件默认的extractor插件,用于抽取 IoTDB 历史或实时数据1.2.x
iotdb-thrift-connectorconnector 插件用于 IoTDB(v1.2.0及以上)与 IoTDB(v1.2.0及以上)之间的数据传输。使用 Thrift RPC 框架传输数据,多线程 async non-blocking IO 模型,传输性能高,尤其适用于目标端为分布式时的场景1.2.x
iotdb-air-gap-connectorconnector 插件用于 IoTDB(v1.2.2+)向 IoTDB(v1.2.2+)跨单向数据网闸的数据同步。支持的网闸型号包括南瑞 Syskeeper 2000 等1.2.1以上

每个插件的详细参数可参考本文参数说明章节。

查看插件

查看系统中的插件(含自定义与内置插件)可以用以下语句:

SHOW PIPEPLUGINS

返回结果如下(其中部分插件为系统内部插件,将在1.3.0版本中删除):

IoTDB> SHOW PIPEPLUGINS
+----------------------------+----------+--------------------------------------------------------------------------------+----------------------------------------------------+
|                  PluginName|PluginType|                                                                       ClassName|                                           PluginJar|
+----------------------------+----------+--------------------------------------------------------------------------------+----------------------------------------------------+
|        DO-NOTHING-CONNECTOR|   Builtin|       org.apache.iotdb.commons.pipe.plugin.builtin.connector.DoNothingConnector|                                                    |
|        DO-NOTHING-PROCESSOR|   Builtin|       org.apache.iotdb.commons.pipe.plugin.builtin.processor.DoNothingProcessor|                                                    |
|     IOTDB-AIR-GAP-CONNECTOR|   Builtin|     org.apache.iotdb.commons.pipe.plugin.builtin.connector.IoTDBAirGapConnector|                                                    |
|             IOTDB-EXTRACTOR|   Builtin|           org.apache.iotdb.commons.pipe.plugin.builtin.extractor.IoTDBExtractor|                                                    |
| IOTDB-LEGACY-PIPE-CONNECTOR|   Builtin| org.apache.iotdb.commons.pipe.plugin.builtin.connector.IoTDBLegacyPipeConnector|                                                    |
|IOTDB-THRIFT-ASYNC-CONNECTOR|   Builtin|org.apache.iotdb.commons.pipe.plugin.builtin.connector.IoTDBThriftAsyncConnector|                                                    |
|      IOTDB-THRIFT-CONNECTOR|   Builtin|     org.apache.iotdb.commons.pipe.plugin.builtin.connector.IoTDBThriftConnector|                                                    |
| IOTDB-THRIFT-SYNC-CONNECTOR|   Builtin| org.apache.iotdb.commons.pipe.plugin.builtin.connector.IoTDBThriftSyncConnector|                                                    |
+----------------------------+----------+--------------------------------------------------------------------------------+----------------------------------------------------+

使用示例

全量数据同步

本例子用来演示将一个 IoTDB 的所有数据同步至另一个IoTDB,数据链路如下图所示:

在这个例子中,我们可以创建一个名为 A2B 的同步任务,用来同步 A IoTDB 到 B IoTDB 间的全量数据,这里需要用到用到 connector 的 iotdb-thrift-connector 插件(内置插件),需指定接收端地址,这个例子中指定了'connector.ip'和'connector.port',也可指定'connector.node-urls',如下面的示例语句:

create pipe A2B
with connector (
  'connector'='iotdb-thrift-connector',
  'connector.ip'='127.0.0.1',
  'connector.port'='6668'
)

历史数据同步

本例子用来演示同步某个历史时间范围(2023年8月23日8点到2023年10月23日8点)的数据至另一个IoTDB,数据链路如下图所示:

在这个例子中,我们可以创建一个名为 A2B 的同步任务。首先我们需要在 extractor 中定义传输数据的范围,由于传输的是历史数据(历史数据是指同步任务创建之前存在的数据),所以需要将extractor.realtime.enable参数配置为false;同时需要配置数据的起止时间start-time和end-time以及传输的模式mode,此处推荐mode设置为 hybrid 模式(hybrid模式为混合传输,在无数据积压时采用实时传输方式,有数据积压时采用批量传输方式,并根据系统内部情况自动切换)。

详细语句如下:

create pipe A2B
WITH EXTRACTOR (
'extractor'= 'iotdb-extractor',
'extractor.realtime.enable' = 'false', 
'extractor.realtime.mode'='hybrid',
'extractor.history.start-time' = '2023.08.23T08:00:00+00:00',
'extractor.history.end-time' = '2023.10.23T08:00:00+00:00') 
with connector (
'connector'='iotdb-thrift-async-connector',
'connector.node-urls'='xxxx:6668',
'connector.batch.enable'='false')

双向数据传输

本例子用来演示两个 IoTDB 之间互为双活的场景,数据链路如下图所示:

在这个例子中,为了避免数据无限循环,需要将A和B上的参数extractor.forwarding-pipe-requests 均设置为 false,表示不转发从另一pipe传输而来的数据。同时将'extractor.history.enable' 设置为 false,表示不传输历史数据,即不同步创建该任务前的数据。

详细语句如下:

在 A IoTDB 上执行下列语句:

create pipe AB
with extractor (
  'extractor.history.enable' = 'false',
  'extractor.forwarding-pipe-requests' = 'false',
with connector (
  'connector'='iotdb-thrift-connector',
  'connector.ip'='127.0.0.1',
  'connector.port'='6668'
)

在 B IoTDB 上执行下列语句:

create pipe BA
with extractor (
  'extractor.history.enable' = 'false',
  'extractor.forwarding-pipe-requests' = 'false',
with connector (
  'connector'='iotdb-thrift-connector',
  'connector.ip'='127.0.0.1',
  'connector.port'='6667'
)

级联数据传输

本例子用来演示多个 IoTDB 之间级联传输数据的场景,数据由A集群同步至B集群,再同步至C集群,数据链路如下图所示:

在这个例子中,为了将A集群的数据同步至C,在BC之间的pipe需要将 extractor.forwarding-pipe-requests 配置为true,详细语句如下:

在A IoTDB上执行下列语句,将A中数据同步至B:

create pipe AB
with connector (
  'connector'='iotdb-thrift-connector',
  'connector.ip'='127.0.0.1',
  'connector.port'='6668'
)

在B IoTDB上执行下列语句,将B中数据同步至C:

create pipe BC
with extractor (
  'extractor.forwarding-pipe-requests' = 'true',
with connector (
  'connector'='iotdb-thrift-connector',
  'connector.ip'='127.0.0.1',
  'connector.port'='6669'
)

跨网闸数据传输

本例子用来演示将一个 IoTDB 的数据,经过单向网闸,同步至另一个IoTDB的场景,数据链路如下图所示:

在这个例子中,需要使用 connector 任务中的iotdb-air-gap-connector 插件(目前支持部分型号网闸,具体型号请联系天谋科技工作人员确认),配置网闸后,在 A IoTDB 上执行下列语句,其中ip和port填写网闸信息,详细语句如下:

create pipe A2B
with connector (
  'connector'='iotdb-air-gap-connector',
  'connector.ip'='10.53.53.53',
  'connector.port'='9780'
)

参考:注意事项

可通过修改 IoTDB 配置文件(iotdb-common.properties)以调整数据同步的参数,如同步数据存储目录等。完整配置如下:

####################
### Pipe Configuration
####################

# Uncomment the following field to configure the pipe lib directory.
# For Windows platform
# If its prefix is a drive specifier followed by "\\", or if its prefix is "\\\\", then the path is
# absolute. Otherwise, it is relative.
# pipe_lib_dir=ext\\pipe
# For Linux platform
# If its prefix is "/", then the path is absolute. Otherwise, it is relative.
# pipe_lib_dir=ext/pipe

# The maximum number of threads that can be used to execute the pipe subtasks in PipeSubtaskExecutor.
# The actual value will be min(pipe_subtask_executor_max_thread_num, max(1, CPU core number / 2)).
# pipe_subtask_executor_max_thread_num=5

# The connection timeout (in milliseconds) for the thrift client.
# pipe_connector_timeout_ms=900000

# The maximum number of selectors that can be used in the async connector.
# pipe_async_connector_selector_number=1

# The core number of clients that can be used in the async connector.
# pipe_async_connector_core_client_number=8

# The maximum number of clients that can be used in the async connector.
# pipe_async_connector_max_client_number=16

# Whether to enable receiving pipe data through air gap.
# The receiver can only return 0 or 1 in tcp mode to indicate whether the data is received successfully.
# pipe_air_gap_receiver_enabled=false

# The port for the server to receive pipe data through air gap.
# pipe_air_gap_receiver_port=9780

参考:参数说明

extractor 参数

keyvaluevalue 取值范围是否必填默认取值
extractoriotdb-extractorString: iotdb-extractor必填-
extractor.pattern用于筛选时间序列的路径前缀String: 任意的时间序列前缀选填root
extractor.history.enable是否同步历史数据Boolean: true, false选填true
extractor.history.start-time同步历史数据的开始 event time,包含 start-timeLong: [Long.MIN_VALUE, Long.MAX_VALUE]选填Long.MIN_VALUE
extractor.history.end-time同步历史数据的结束 event time,包含 end-timeLong: [Long.MIN_VALUE, Long.MAX_VALUE]选填Long.MAX_VALUE
extractor.realtime.enable是否同步实时数据Boolean: true, false选填true
extractor.realtime.mode实时数据的抽取模式String: hybrid, log, file选填hybrid
extractor.forwarding-pipe-requests是否转发由其他 Pipe (通常是数据同步)写入的数据Boolean: true, false选填true

💎 说明:历史数据与实时数据的差异

  • 历史数据:所有 arrival time < 创建 pipe 时当前系统时间的数据称为历史数据
  • 实时数据:所有 arrival time >= 创建 pipe 时当前系统时间的数据称为实时数据
  • 全量数据: 全量数据 = 历史数据 + 实时数据

💎 ​说明:数据抽取模式hybrid, log和file的差异

  • hybrid(推荐):该模式下,任务将优先对数据进行实时处理、发送,当数据产生积压时自动切换至批量发送模式,其特点是平衡了数据同步的时效性和吞吐量
  • ​log:该模式下,任务将对数据进行实时处理、发送,其特点是高时效、低吞吐
  • file:该模式下,任务将对数据进行批量(按底层数据文件)处理、发送,其特点是低时效、高吞吐

connector 参数

iotdb-thrift-connector

keyvaluevalue 取值范围是否必填默认取值
connectoriotdb-thrift-connector 或 iotdb-thrift-sync-connectorString: iotdb-thrift-connector 或 iotdb-thrift-sync-connector必填
connector.ip目标端 IoTDB 其中一个 DataNode 节点的数据服务 ip(请注意同步任务不支持向自身服务进行转发)String选填与 connector.node-urls 任选其一填写
connector.port目标端 IoTDB 其中一个 DataNode 节点的数据服务 port(请注意同步任务不支持向自身服务进行转发)Integer选填与 connector.node-urls 任选其一填写
connector.node-urls目标端 IoTDB 任意多个 DataNode 节点的数据服务端口的 url(请注意同步任务不支持向自身服务进行转发)String。例:'127.0.0.1:6667,127.0.0.1:6668,127.0.0.1:6669', '127.0.0.1:6667'选填与 connector.ip:connector.port 任选其一填写
connector.batch.enable是否开启日志攒批发送模式,用于提高传输吞吐,降低 IOPSBoolean: true, false选填true
connector.batch.max-delay-seconds在开启日志攒批发送模式时生效,表示一批数据在发送前的最长等待时间(单位:s)Integer选填1
connector.batch.size-bytes在开启日志攒批发送模式时生效,表示一批数据最大的攒批大小(单位:byte)Long选填

iotdb-air-gap-connector

keyvaluevalue 取值范围是否必填默认取值
connectoriotdb-air-gap-connectorString: iotdb-air-gap-connector必填
connector.ip目标端 IoTDB 其中一个 DataNode 节点的数据服务 ipString选填与 connector.node-urls 任选其一填写
connector.port目标端 IoTDB 其中一个 DataNode 节点的数据服务 portInteger选填与 connector.node-urls 任选其一填写
connector.node-urls目标端 IoTDB 任意多个 DataNode 节点的数据服务端口的 urlString。例:'127.0.0.1:6667,127.0.0.1:6668,127.0.0.1:6669', '127.0.0.1:6667'选填与 connector.ip:connector.port 任选其一填写
connector.air-gap.handshake-timeout-ms发送端与接收端在首次尝试建立连接时握手请求的超时时长,单位:毫秒Integer选填5000

Copyright © 2024 The Apache Software Foundation.
Apache and the Apache feather logo are trademarks of The Apache Software Foundation

Have a question? Connect with us on QQ, WeChat, or Slack. Join the community now.