跳至主要內容

数据同步软件

大约 8 分钟

数据同步软件

除了内置的 Pipe 同步功能之外,IoTDB 还提供外置的数据同步软件。该软件由 Java 编写,因此具有跨平台性,在 Windows 和 Linux 下都可以执行同步功能。该软件除 Pipe 的基础功能外,还添加了额外功能。该软件支持命令行中以 SQL 语句执行,此外还支持图形化界面。

连接方法

在使用数据同步软件时,首先需定位发送端集群。为允许发送端集群出于关闭状态,此处除可以通过 ip:port 定位集群,也可以给定发送端集群的 data 目录。将以下两组配置其一写入 data-sync.properties 即可:

  • ssh_port, ssh_user, ssh_password, data_dir:用于定位数据目录位置,当 ssh 未配置时默认为本机。
  • user, password, dn_rpc_address, dn_rpc_port:用于定位运行中的 IoTDB 位置,user 和 password 可不填写,默认为 root。

软件特性及用法

数据同步软件支持 SQL 及图形化界面的交互。其中,SQL 语句的用法与 IoTDB Pipe 的命令行用法相同。下面先介绍创建同步任务的 SQL 语句:

创建同步任务

create pipe p1 
  with extractor (
    ....
  ) 
  with processor (
    ....
  ) 
  with connector (
    ....
  )

其中 extractor,processor,connector 均为可自定义的插件。extractor 为数据的收集器,负责收集 IoTDB 内的特定数据;processor 为处理器,负责对收集到的数据进行过滤等处理;connector 为连接器,负责对数据进行发送。此外,with extractor, with processor 两段均可不填,不填时,extractor 与 processor 均为默认值。上述命令中(....)部分为可配置的参数,容忍无效参数,具体有效参数取决于插件实现本身。

下面将列出该软件的所有特性及其 SQL 用法。

数据收集

目前软件自带的 Extractor 为 iotdb-extractor,该 extractor 支持对任意前缀路径进行同步,即支持选定任意数据库,设备和时间序列。此外,还可以选择同步历史数据 / 实时数据或是两者,还支持规定历史数据的起始时间和截止时间。示例的 extractor 参数如下:

create pipe p1 
    with extractor (
    'extractor'='iotdb-extractor',
    'extractor.pattern'='root', 
    'extractor.history.enable'='true',
    'extractor.history.start-time'='2023-07-03T16:49:58.845+08:00', 
    'extractor.history.end-time'='2023-07-04T16:49:58.845+08:00',
    'extractor.realtime.enable'='true',
    'extractor.realtime.mode'='log'
    )
    with processor (
      ....
    )
    with connector (
      ....
    )

其中,各参数的含义如下:

参数名参数说明是否必需
extractor通用配置,表示选用的 extractor 类型,这里是软件自带的 iotdb-extractor。否,默认为 iotdb-extractor
extractor.pattern为 iotdb-extractor 规定的特定配置,之后的参数相同。这里为选定的数据前缀路径。否,默认为 root
extractor.history.enable规定是否可以同步历史数据,取值为 true 或 false。否,默认为 true
extractor.history.start-time为历史数据的截取开始时间戳,用于截取某一时间段的数据。否,默认为历史数据开始时间
extractor.history.end-time为历史数据的截取停止时间戳,用于截取某一时间段的数据。否,默认为历史数据结束时间
extractor.realtime.enable规定是否可以同步实时数据,取值为 true 或 false否,默认为true
extractor.realtime.mode规定实时数据的同步方式,取值为 log,hybrid,file,表示基于文件,WAL或混合同步否,默认为hybrid

数据处理

此外,该软件支持对选择的数据进行处理。目前的 processor 内置了一些简单功能,例如基于某个字段取值的过滤,选择和重命名等,还可以自定义流处理算法,来对收集到的数据进行处理,例如滑动平均等。一些较为简单的功能已经封装在软件内部,而如果想要自定义复杂功能,可以编写相关的类作为数据的处理插件。

无操作

无操作时,使用 do-nothing-processor 即可。示例的 processor 参数如下:

create pipe p1 
  with extractor (
    ....
  )
  with processor (
    'processor'='do-nothing-processor'
  )
  with connector (
    ....
  )

与上述相似,此处的 processor 为通用配置,表示选用的 processor 类型。该配置非必选项,不填时即默认为 do-nothing-processor。

取值过滤及选择

使用自带的取值过滤 processor 可以根据 IoTDB 点的取值进行过滤。示例的 processor 参数如下:

create pipe p1 
  with extractor (
    ....
  )
  with processor (
    'processor'='filter-processor',
    'processor.include.condition.type'='double',
    'processor.include.condition'='>1',
    'processor.exclude.condition.type'='double',
    'processor.exclude.condition'='>=2'
  )
  with connector (
    ....
  )

此处的 processor.include.condition 为选择某个取值的条件,processor.exclude.condition 为过滤某个取值的条件,二者必填其一。此处的参数表示选取收集的数据中,类型为 double 且大于 1 小于 2 的数据。

取值重写

使用取值重写 processor 可以根据 IoTDB 点的取值进行改写。processor 参数如下:

create pipe p1 
  with extractor (
    ....
  ) 
  with processor (
    'processor'='rewrite-processor',
    'processor.rewrite.condition.type'='double',
    'processor.rewrite.condition'='>1',
    'processor.rewrite.newValue'='1'
  )
  with connector (
    ....
  )

此处的 processor.rewrite.condition 表示进行重写的判断条件,必填;processor.rewrite.newValue 表示进行重写的新值,同样必填。此处的参数表示将收集的数据中,类型为 double 且值大于 1 的数据改为 1,其他数据不变。

字段重命名

除了取值重写的功能之外,软件还支持对字段的重命名功能。该功能的参数如下:

create pipe p1 
  with extractor (
    ....
  ) 
  with processor (
    'processor'='rename-processor',
    'processor.rename.oldPattern'='root.testpipe.d0',
    'processor.rename.newPattern'='root.receive.d1'
  )
  with connector (
    ....
  )

此处的 processor.rename.oldPattern 表示被重命名的序列,processor.rename.newPattern 表示序列的新名称。两者均为必填。

数据发送

数据同步软件提供了内置的多种 connector,这些 connector 的类型如下:

Thrift connector

支持使用 Thrift 方式,将数据同步到 IoTDB 的接收端。采用 Thrift 方式的同步参数如下:

  • iotdb-thrift-connector
  • iotdb-thrift-connector-v1
  • iotdb-thrift-connector-v2

其中,iotdb-thrift-connector-v1 会选择某个接收端地址进行发送,通常在单个地址时效率较高;iotdb-thrift-connector-v2 会并发在所有接收端地址进行发送,在多个地址时效率较高。
iotdb-thrift-connector 会选择当前版本默认的 connector 进行发送,目前为 iotdb-thrift-connector-v1。以上 connector 公用相关参数,其取值示例如下:

create pipe p1 
  with extractor (
    ....
  )
  with processor (
    ....
  )
  with connector (
    'connector'='iotdb-thrift-connector',
    'connector.ip'='xxx.xxx.xxx.xxx', 
    'connector.port'='xxxx',
    'connector.node-urls'='xxx.xxx.xxx.xxx:xxxx,yyy.yyy.yyy.yyy:yyyy',
    'connector.compression'='zstd'
  )
参数名参数说明是否必需
connector通用配置,表示选用的 connector 类型,这里是 iotdb-thrift-connector, iotdb-thrift-connector-v1 或 iotdb-thrift-connector-v2。否,默认为iotdb-thrift-connector
connector.ip表示接收端选定 IoTDB 的 IP 地址与 node-urls 必选其一
connector.port表示接收端选定 IoTDB 的端口与 node-urls 必选其一
connector.node-urls表示接收端集群的地址列表,与上面的 ip/port 可以共存与 ip/port 必选其一
connector.compression表示发送时 tsFile 使用的二次压缩算法否,默认为不压缩

InfluxDB connector

此外,使用 InfluxDB connector,还可以将上述经过筛选、处理的数据同步到 InfluxDB。该 Connector 的名称为 influxdb-connector。目前仅支持单点传输。参数取值示例如下:

create pipe p1 
  with extractor (
    ....
  )
  with processor (
    ....
  ) 
  with connector (
    'connector'='influxdb-connector',
    'connector.ip'='xxx.xxx.xxx.xxx',
    'connector.port'='xxxx'
  )

这里的参数名和参数说明同上,但 ip 和 port 此时为必选项。

Local file backup connector

Local file backup connector 能够将 IoTDB 内部的 tsFile 文件备份至本地。此时的参数为:

create pipe p1 
  with extractor (
    ....
  )
  with processor (
    ....
  )
  with connector (
    'connector'='local-file-backup-connector',
    'connector.path'='/usr/local', 
    'connector.compression'='zstd'
  )

这里的 connector.path 指要备份的文件目录,必填。connector.compression 为二次压缩方式,可选,非空时将根据 compression 的类型对打包出的 tsFile 进行二次压缩。

除了以上三类 connector 外,用户还可以自定义 connector 来实现数据的自定义发送。理论上,可以将数据以任意压缩格式,通过任何方法,发送至任何端口。

REST api

与 IoTDB 自带的 Pipe 功能相同,该软件也提供了使用 REST 接口的访问方式,能够查看同步任务的启停和执行状态等。下面是一个示例的 REST 接口访问方法:

curl -H "Content-Type:application/json" -H "Authorization:Basic cm9vdDpyb290" -X POST --data '{"sql":"show pipes"}' http://127.0.0.1:18080/rest/v2/query

其中示例中 Authorization: Basic cm9vdDpyb2901 字符串为用户名 root,密码 root 对应的 Basic 鉴权 Header 格式。用户名、密码对应的完整鉴权格式为 base64.encode(username + ':' + password),如果手动设置了用户名及密码,需要按照该格式来编写鉴权字符串。
SQL 中的 show pipes 与 IoTDB 中的对应格式相同,18080 则为配置的 REST 端口。

此处的 REST api 也提供了配置文件,在项目根目录下的 data-sync.properties 中。其中能够对 REST 服务是否启用,用户名,密码及 REST 对外的端口进行配置。

开启同步任务

开启同步任务的 SQL 同样与 IoTDB 内置 Pipe 的定义方式相同,如下:

start pipe p1

停止同步任务

stop pipe p1

删除同步任务

drop pipe p1

Copyright © 2023 The Apache Software Foundation.
Apache and the Apache feather logo are trademarks of The Apache Software Foundation

Have a question? Connect with us on QQ, WeChat, or Slack. Join the community now.

We use Google Analytics to collect anonymous, aggregated usage information.