Apache NiFi
2023年7月10日大约 4 分钟
Apache NiFi
Apache NiFi简介
Apache NiFi 是一个易用的、功能强大的、可靠的数据处理和分发系统。
Apache NiFi 支持强大的、可伸缩的数据路由、转换和系统中介逻辑的有向图。
Apache NiFi 包含以下功能:
- 基于浏览器的用户接口:
- 设计、控制、反馈和监控的无缝体验
- 数据起源跟踪
- 从头到尾完整的信息族谱
- 丰富的配置
- 丢失容忍和保证交付
- 低延迟和高吞吐
- 动态优先级策略
- 运行时可以修改流配置
- 反向压力控制
- 扩展设计
- 用于定制 processors 和 services 的组件体系结构
- 快速开发和迭代测试
- 安全会话
- 带有可配置认证策略的 HTTPS 协议
- 多租户授权和策略管理
- 包括TLS和SSH的加密通信的标准协议
PutIoTDBRecord
这是一个用于数据写入的处理器。它使用配置的 Record Reader 将传入 FlowFile 的内容读取为单独的记录,并使用本机接口将它们写入 Apache IoTDB。
PutIoTDBRecord的配置项
| 配置项 | 描述 | 默认值 | 是否必填 |
|---|---|---|---|
| Host | IoTDB 的主机名 | null | true |
| Port | IoTDB 的端口 | 6667 | true |
| Username | IoTDB 的用户名 | null | true |
| Password | IoTDB 的密码 | null | true |
| Prefix | 将被写入IoTDB的数据的tsName前缀 以root. 开头 可以使用Nifi expression language做动态替换. | null | true |
| Time | 时间字段名 | null | true |
| Record Reader | 指定一个 Record Reader controller service 来解析数据,并且推断数据格式。 | null | true |
| Schema | IoTDB 需要的 schema 不能很好的被 NiFi 支持,因此你可以在这里自定义 schema。 除此之外,你可以通过这个方式设置编码和压缩类型。如果你没有设置这个配置,就会使用 Record Reader 推断的 schema。 这个配置可以通过 Attributes 的表达式来更新。 | null | false |
| Aligned | 是否使用 aligned 接口? 这个配置可以通过 Attributes 的表达式来更新。 | false | false |
| MaxRowNumber | 指定 tablet 的最大行数。 这个配置可以通过 Attributes 的表达式来更新。 | 1024 | false |
Flowfile 的推断数据类型
如果要使用推断类型,需要注意以下几点:
- 输入的 flowfile 需要能被
Record Reader读取。 - flowfile的 schema 中必须包含以时间字段名属性命名的字段
Time的数据类型只能是STRING或者LONG。- 除
Time以外的列必须以root.开头。 - 支持的数据类型有:
INT,LONG,FLOAT,DOUBLE,BOOLEAN,TEXT。
通过配置项自定义 schema
如上所述,通过配置项来自定义 schema 比起推断的 schema来说,是一种更加灵活和强大的方式。
Schema 配置项的解构如下:
{
"fields": [{
"tsName": "s1",
"dataType": "INT32",
"encoding": "RLE",
"compressionType": "GZIP"
}, {
"tsName": "s2",
"dataType": "INT64",
"encoding": "RLE",
"compressionType": "GZIP"
}]
}注意
- flowfile 的第一列数据必须为
Time。剩下的必须与fields配置中保持一样的顺序。 - 定义 shema 的 JSON 中必须包含
timeTypeandfields这两项。 timeType只支持LONG和STRING这两个选项。tsName和dataType这两项必须被设置。- 当数据插入IoTDB时,Prefix属性会被添加到 tsName以作为插入的字段名。
- 支持的
dataTypes有:INT32,INT64,FLOAT,DOUBLE,BOOLEAN,TEXT。 - 支持的
encoding有:PLAIN,DICTIONARY,RLE,DIFF,TS_2DIFF,BITMAP,GORILLA_V1,REGULAR,GORILLA,ZIGZAG,CHIMP,SPRINTZ,RLBE。 - 支持的
compressionType有:UNCOMPRESSED,SNAPPY,GZIP,LZO,SDT,PAA,PLA,LZ4,ZSTD,LZMA2。
Relationships
| relationship | 描述 |
|---|---|
| success | 数据能被正确的写入。 |
| failure | schema 或者数据有异常。 |
QueryIoTDBRecord
这是一个用于数据读取的处理器。它通过读取 FlowFile 的内容中的SQL 查询来对IoTDB的原生接口进行访问,并将查询结果用Record Writer写入 flowfile。
QueryIoTDBRecord的配置项
| 配置项 | 描述 | 默认值 | 是否必填 |
|---|---|---|---|
| Host | IoTDB 的主机名 | null | true |
| Port | IoTDB 的端口 | 6667 | true |
| Username | IoTDB 的用户名 | null | true |
| Password | IoTDB 的密码 | null | true |
| Record Writer | 指定一个 Record Writer controller service 来写入数据。 | null | true |
| iotdb-query | 需要执行的IoTDB query 。 Note: 如果有连入侧的连接那么查询会从FlowFile的内容中提取,否则使用当前配置的属性 | null | false |
| iotdb-query-chunk-size | 返回的结果可以进行分块,数据流中会返回一批按设置大小切分的数据,而不是一个单一的响应. 分块查询可以返回无限量的行。 注意: 数据分块只有在设置不为0时启用 | 0 | false |
Relationships
| relationship | 描述 |
|---|---|
| success | 数据能被正确的写入。 |
| failure | schema 或者数据有异常。 |
