跳至主要內容

数据同步

大约 11 分钟

数据同步

数据同步是工业物联网的典型需求,通过数据同步机制,可实现IoTDB之间的数据共享,搭建完整的数据链路来满足内网外网数据互通、端边云同步、数据迁移、数据备份等需求。

功能介绍

同步任务概述

一个数据同步任务包含2个阶段:

  • 抽取(Source)阶段:该部分用于从源 IoTDB 抽取数据,在SQL语句中的 source 部分定义
  • 发送(Sink)阶段:该部分用于向目标 IoTDB 发送数据,在SQL语句中的 sink 部分定义

通过 SQL 语句声明式地配置2个部分的具体内容,可实现灵活的数据同步能力。

同步任务 - 创建

使用 CREATE PIPE 语句来创建一条数据同步任务,下列属性中PipeIdsink为必填项,sourceprocessor为选填项,输入SQL时注意 SOURCE SINK 插件顺序不能替换。

SQL 示例如下:

CREATE PIPE <PipeId> -- PipeId 是能够唯一标定任务任务的名字
-- 数据抽取插件,必填插件
WITH SOURCE (
  [<parameter> = <value>,],
-- 数据连接插件,必填插件
WITH SINK (
  [<parameter> = <value>,],
)

📌 注:使用数据同步功能,请保证接收端开启自动创建元数据

同步任务 - 管理

数据同步任务有三种状态:RUNNING、STOPPED和DROPPED。任务状态转换如下图所示:

状态迁移图
状态迁移图

一个数据同步任务在生命周期中会经过多种状态:

  • RUNNING: 运行状态。
  • STOPPED: 停止状态。
    • 说明1:任务的初始状态为停止状态,需要使用SQL语句启动任务
    • 说明2:用户也可以使用SQL语句手动将一个处于运行状态的任务停止,此时状态会从 RUNNING 变为 STOPPED
    • 说明3:当一个任务出现无法恢复的错误时,其状态会自动从 RUNNING 变为 STOPPED
  • DROPPED:删除状态。

我们提供以下SQL语句对同步任务进行状态管理。

启动任务

创建之后,任务不会立即被处理,需要启动任务。使用START PIPE语句来启动任务,从而开始处理数据:

START PIPE<PipeId>

停止任务

停止处理数据:

STOP PIPE <PipeId>

删除任务

删除指定任务:

DROP PIPE <PipeId>

删除任务不需要先停止同步任务。

查看任务

查看全部任务:

SHOW PIPES

查看指定任务:

SHOW PIPE <PipeId>

插件

为了使得整体架构更加灵活以匹配不同的同步场景需求,在上述同步任务框架中IoTDB支持进行插件组装。系统为您预置了一些常用插件可直接使用,同时您也可以自定义 sink 插件,并加载至IoTDB系统进行使用。

模块插件预置插件自定义插件
抽取(Source)Source 插件iotdb-source不支持
发送(Sink)Sink 插件iotdb-thrift-sink、iotdb-air-gap-sink支持

预置插件

预置插件如下:

插件名称类型介绍适用版本
iotdb-sourcesource 插件默认的 source 插件,用于抽取 IoTDB 历史或实时数据1.2.x
iotdb-thrift-sinksink 插件用于 IoTDB(v1.2.0及以上)与 IoTDB(v1.2.0及以上)之间的数据传输。使用 Thrift RPC 框架传输数据,多线程 async non-blocking IO 模型,传输性能高,尤其适用于目标端为分布式时的场景1.2.x
iotdb-air-gap-sinksink 插件用于 IoTDB(v1.2.2+)向 IoTDB(v1.2.2+)跨单向数据网闸的数据同步。支持的网闸型号包括南瑞 Syskeeper 2000 等1.2.1以上

每个插件的详细参数可参考本文参数说明章节。

查看插件

查看系统中的插件(含自定义与内置插件)可以用以下语句:

SHOW PIPEPLUGINS

返回结果如下:

IoTDB> show pipeplugins
+--------------------+----------+---------------------------------------------------------------------------+---------+
|          PluginName|PluginType|                                                                  ClassName|PluginJar|
+--------------------+----------+---------------------------------------------------------------------------+---------+
|DO-NOTHING-PROCESSOR|   Builtin|  org.apache.iotdb.commons.pipe.plugin.builtin.processor.DoNothingProcessor|         |
|     DO-NOTHING-SINK|   Builtin|  org.apache.iotdb.commons.pipe.plugin.builtin.sink.DoNothingSink|         |
|  IOTDB-AIR-GAP-SINK|   Builtin|org.apache.iotdb.commons.pipe.plugin.builtin.sink.IoTDBAirGapSink|         |
|        IOTDB-SOURCE|   Builtin|      org.apache.iotdb.commons.pipe.plugin.builtin.source.IoTDBSOURCE|         |
|   IOTDB-THRIFT-SINK|   Builtin|org.apache.iotdb.commons.pipe.plugin.builtin.sink.IoTDBThriftSinkr|         |
|         OPC-UA-SINK|   Builtin|      org.apache.iotdb.commons.pipe.plugin.builtin.sink.OpcUaSink|         |
+--------------------+----------+---------------------------------------------------------------------------+---------+

使用示例

全量数据同步

本例子用来演示将一个 IoTDB 的所有数据同步至另一个IoTDB,数据链路如下图所示:

在这个例子中,我们可以创建一个名为 A2B 的同步任务,用来同步 A IoTDB 到 B IoTDB 间的全量数据,这里需要用到用到 sink 的 iotdb-thrift-sink 插件(内置插件),需指定接收端地址,这个例子中指定了'sink.ip'和'sink.port',也可指定'sink.node-urls',如下面的示例语句:

create pipe A2B
with sink (
  'sink'='iotdb-thrift-sink',
  'sink.ip'='127.0.0.1',
  'sink.port'='6668'
)

历史数据同步

本例子用来演示同步某个历史时间范围(2023年8月23日8点到2023年10月23日8点)的数据至另一个IoTDB,数据链路如下图所示:

在这个例子中,我们可以创建一个名为 A2B 的同步任务。首先我们需要在 source 中定义传输数据的范围,由于传输的是历史数据(历史数据是指同步任务创建之前存在的数据),所以需要将source.realtime.enable参数配置为false;同时需要配置数据的起止时间start-time和end-time以及传输的模式mode,此处推荐mode设置为 hybrid 模式(hybrid模式为混合传输,在无数据积压时采用实时传输方式,有数据积压时采用批量传输方式,并根据系统内部情况自动切换)。

详细语句如下:

create pipe A2B
WITH SOURCE (
'source'= 'iotdb-source',
'source.realtime.enable' = 'false', 
'source.realtime.mode'='hybrid',
'source.history.start-time' = '2023.08.23T08:00:00+00:00',
'source.history.end-time' = '2023.10.23T08:00:00+00:00') 
with SINK (
'sink'='iotdb-thrift-async-sink',
'sink.node-urls'='xxxx:6668',
'sink.batch.enable'='false')

双向数据传输

本例子用来演示两个 IoTDB 之间互为双活的场景,数据链路如下图所示:

在这个例子中,为了避免数据无限循环,需要将A和B上的参数source.forwarding-pipe-requests 均设置为 false,表示不转发从另一pipe传输而来的数据。同时将'source.history.enable' 设置为 false,表示不传输历史数据,即不同步创建该任务前的数据。

详细语句如下:

在 A IoTDB 上执行下列语句:

create pipe AB
with source (
  'source.history.enable' = 'false',
  'source.forwarding-pipe-requests' = 'false',
with sink (
  'sink'='iotdb-thrift-sink',
  'sink.ip'='127.0.0.1',
  'sink.port'='6668'
)

在 B IoTDB 上执行下列语句:

create pipe BA
with source (
  'source.history.enable' = 'false',
  'source.forwarding-pipe-requests' = 'false',
with sink (
  'sink'='iotdb-thrift-sink',
  'sink.ip'='127.0.0.1',
  'sink.port'='6667'
)

级联数据传输

本例子用来演示多个 IoTDB 之间级联传输数据的场景,数据由A集群同步至B集群,再同步至C集群,数据链路如下图所示:

在这个例子中,为了将A集群的数据同步至C,在BC之间的pipe需要将 source.forwarding-pipe-requests 配置为true,详细语句如下:

在A IoTDB上执行下列语句,将A中数据同步至B:

create pipe AB
with sink (
  'sink'='iotdb-thrift-sink',
  'sink.ip'='127.0.0.1',
  'sink.port'='6668'
)

在B IoTDB上执行下列语句,将B中数据同步至C:

create pipe BC
with source (
  'source.forwarding-pipe-requests' = 'true',
with sink (
  'sink'='iotdb-thrift-sink',
  'sink.ip'='127.0.0.1',
  'sink.port'='6669'
)

跨网闸数据传输

本例子用来演示将一个 IoTDB 的数据,经过单向网闸,同步至另一个IoTDB的场景,数据链路如下图所示:

在这个例子中,需要使用 sink 任务中的iotdb-air-gap-sink 插件(目前支持部分型号网闸,具体型号请联系天谋科技工作人员确认),配置网闸后,在 A IoTDB 上执行下列语句,其中ip和port填写网闸信息,详细语句如下:

create pipe A2B
with sink (
  'sink'='iotdb-air-gap-sink',
  'sink.ip'='10.53.53.53',
  'sink.port'='9780'
)

参考:注意事项

可通过修改 IoTDB 配置文件(iotdb-common.properties)以调整数据同步的参数,如同步数据存储目录等。完整配置如下:

####################
### Pipe Configuration
####################

# Uncomment the following field to configure the pipe lib directory.
# For Windows platform
# If its prefix is a drive specifier followed by "\\", or if its prefix is "\\\\", then the path is
# absolute. Otherwise, it is relative.
# pipe_lib_dir=ext\\pipe
# For Linux platform
# If its prefix is "/", then the path is absolute. Otherwise, it is relative.
# pipe_lib_dir=ext/pipe

# The maximum number of threads that can be used to execute the pipe subtasks in PipeSubtaskExecutor.
# The actual value will be min(pipe_subtask_executor_max_thread_num, max(1, CPU core number / 2)).
# pipe_subtask_executor_max_thread_num=5

# The connection timeout (in milliseconds) for the thrift client.
# pipe_connector_timeout_ms=900000

# The maximum number of selectors that can be used in the async connector.
# pipe_async_connector_selector_number=1

# The core number of clients that can be used in the async connector.
# pipe_async_connector_core_client_number=8

# The maximum number of clients that can be used in the async connector.
# pipe_async_connector_max_client_number=16

# Whether to enable receiving pipe data through air gap.
# The receiver can only return 0 or 1 in tcp mode to indicate whether the data is received successfully.
# pipe_air_gap_receiver_enabled=false

# The port for the server to receive pipe data through air gap.
# pipe_air_gap_receiver_port=9780

参考:参数说明

source 参数

keyvaluevalue 取值范围是否必填默认取值
sourceiotdb-sourceString: iotdb-source必填-
source.pattern用于筛选时间序列的路径前缀String: 任意的时间序列前缀选填root
source.history.enable是否同步历史数据Boolean: true, false选填true
source.history.start-time同步历史数据的开始 event time,包含 start-timeLong: [Long.MIN_VALUE, Long.MAX_VALUE]选填Long.MIN_VALUE
source.history.end-time同步历史数据的结束 event time,包含 end-timeLong: [Long.MIN_VALUE, Long.MAX_VALUE]选填Long.MAX_VALUE
source.realtime.enable是否同步实时数据Boolean: true, false选填true
source.realtime.mode实时数据的抽取模式String: hybrid, stream, batch选填hybrid
source.forwarding-pipe-requests是否转发由其他 Pipe (通常是数据同步)写入的数据Boolean: true, false选填true

💎 说明:历史数据与实时数据的差异

  • 历史数据:所有 arrival time < 创建 pipe 时当前系统时间的数据称为历史数据
  • 实时数据:所有 arrival time >= 创建 pipe 时当前系统时间的数据称为实时数据
  • 全量数据: 全量数据 = 历史数据 + 实时数据

💎 ​说明:数据抽取模式hybrid, stream和batch的差异

  • hybrid(推荐):该模式下,任务将优先对数据进行实时处理、发送,当数据产生积压时自动切换至批量发送模式,其特点是平衡了数据同步的时效性和吞吐量
  • stream:该模式下,任务将对数据进行实时处理、发送,其特点是高时效、低吞吐
  • batch:该模式下,任务将对数据进行批量(按底层数据文件)处理、发送,其特点是低时效、高吞吐

sink 参数

iotdb-thrift-sink

keyvaluevalue 取值范围是否必填默认取值
sinkiotdb-thrift-sink 或 iotdb-thrift-sync-sinkString: iotdb-thrift-sink 或 iotdb-thrift-sync-sink必填
sink.ip目标端 IoTDB 其中一个 DataNode 节点的数据服务 ip(请注意同步任务不支持向自身服务进行转发)String选填与 sink.node-urls 任选其一填写
sink.port目标端 IoTDB 其中一个 DataNode 节点的数据服务 port(请注意同步任务不支持向自身服务进行转发)Integer选填与 sink.node-urls 任选其一填写
sink.node-urls目标端 IoTDB 任意多个 DataNode 节点的数据服务端口的 url(请注意同步任务不支持向自身服务进行转发)String。例:'127.0.0.1:6667,127.0.0.1:6668,127.0.0.1:6669', '127.0.0.1:6667'选填与 sink.ip:sink.port 任选其一填写
sink.batch.enable是否开启日志攒批发送模式,用于提高传输吞吐,降低 IOPSBoolean: true, false选填true
sink.batch.max-delay-seconds在开启日志攒批发送模式时生效,表示一批数据在发送前的最长等待时间(单位:s)Integer选填1
sink.batch.size-bytes在开启日志攒批发送模式时生效,表示一批数据最大的攒批大小(单位:byte)Long选填

iotdb-air-gap-sink

keyvaluevalue 取值范围是否必填默认取值
sinkiotdb-air-gap-sinkString: iotdb-air-gap-sink必填
sink.ip目标端 IoTDB 其中一个 DataNode 节点的数据服务 ipString选填与 sink.node-urls 任选其一填写
sink.port目标端 IoTDB 其中一个 DataNode 节点的数据服务 portInteger选填与 sink.node-urls 任选其一填写
sink.node-urls目标端 IoTDB 任意多个 DataNode 节点的数据服务端口的 urlString。例:'127.0.0.1:6667,127.0.0.1:6668,127.0.0.1:6669', '127.0.0.1:6667'选填与 sink.ip:sink.port 任选其一填写
sink.air-gap.handshake-timeout-ms发送端与接收端在首次尝试建立连接时握手请求的超时时长,单位:毫秒Integer选填5000

Copyright © 2024 The Apache Software Foundation.
Apache and the Apache feather logo are trademarks of The Apache Software Foundation

Have a question? Connect with us on QQ, WeChat, or Slack. Join the community now.

We use Google Analytics to collect anonymous, aggregated usage information.