Apache NiFi
2023年7月10日大约 4 分钟
Apache NiFi
Apache NiFi简介
Apache NiFi 是一个易用的、功能强大的、可靠的数据处理和分发系统。
Apache NiFi 支持强大的、可伸缩的数据路由、转换和系统中介逻辑的有向图。
Apache NiFi 包含以下功能:
- 基于浏览器的用户接口:
- 设计、控制、反馈和监控的无缝体验
- 数据起源跟踪
- 从头到尾完整的信息族谱
- 丰富的配置
- 丢失容忍和保证交付
- 低延迟和高吞吐
- 动态优先级策略
- 运行时可以修改流配置
- 反向压力控制
- 扩展设计
- 用于定制 processors 和 services 的组件体系结构
- 快速开发和迭代测试
- 安全会话
- 带有可配置认证策略的 HTTPS 协议
- 多租户授权和策略管理
- 包括TLS和SSH的加密通信的标准协议
PutIoTDBRecord
这是一个用于数据写入的处理器。它使用配置的 Record Reader 将传入 FlowFile 的内容读取为单独的记录,并使用本机接口将它们写入 Apache IoTDB。
PutIoTDBRecord的配置项
配置项 | 描述 | 默认值 | 是否必填 |
---|---|---|---|
Host | IoTDB 的主机名 | null | true |
Port | IoTDB 的端口 | 6667 | true |
Username | IoTDB 的用户名 | null | true |
Password | IoTDB 的密码 | null | true |
Prefix | 将被写入IoTDB的数据的tsName前缀 以root. 开头 可以使用Nifi expression language做动态替换. | null | true |
Time | 时间字段名 | null | true |
Record Reader | 指定一个 Record Reader controller service 来解析数据,并且推断数据格式。 | null | true |
Schema | IoTDB 需要的 schema 不能很好的被 NiFi 支持,因此你可以在这里自定义 schema。 除此之外,你可以通过这个方式设置编码和压缩类型。如果你没有设置这个配置,就会使用 Record Reader 推断的 schema。 这个配置可以通过 Attributes 的表达式来更新。 | null | false |
Aligned | 是否使用 aligned 接口? 这个配置可以通过 Attributes 的表达式来更新。 | false | false |
MaxRowNumber | 指定 tablet 的最大行数。 这个配置可以通过 Attributes 的表达式来更新。 | 1024 | false |
Flowfile 的推断数据类型
如果要使用推断类型,需要注意以下几点:
- 输入的 flowfile 需要能被
Record Reader
读取。 - flowfile的 schema 中必须包含以时间字段名属性命名的字段
Time
的数据类型只能是STRING
或者LONG
。- 除
Time
以外的列必须以root.
开头。 - 支持的数据类型有:
INT
,LONG
,FLOAT
,DOUBLE
,BOOLEAN
,TEXT
。
通过配置项自定义 schema
如上所述,通过配置项来自定义 schema 比起推断的 schema来说,是一种更加灵活和强大的方式。
Schema
配置项的解构如下:
{
"fields": [{
"tsName": "s1",
"dataType": "INT32",
"encoding": "RLE",
"compressionType": "GZIP"
}, {
"tsName": "s2",
"dataType": "INT64",
"encoding": "RLE",
"compressionType": "GZIP"
}]
}
注意
- flowfile 的第一列数据必须为
Time
。剩下的必须与fields
配置中保持一样的顺序。 - 定义 shema 的 JSON 中必须包含
timeType
andfields
这两项。 timeType
只支持LONG
和STRING
这两个选项。tsName
和dataType
这两项必须被设置。- 当数据插入IoTDB时,Prefix属性会被添加到 tsName以作为插入的字段名。
- 支持的
dataTypes
有:INT32
,INT64
,FLOAT
,DOUBLE
,BOOLEAN
,TEXT
。 - 支持的
encoding
有:PLAIN
,DICTIONARY
,RLE
,DIFF
,TS_2DIFF
,BITMAP
,GORILLA_V1
,REGULAR
,GORILLA
,ZIGZAG
,CHIMP
,SPRINTZ
,RLBE
。 - 支持的
compressionType
有:UNCOMPRESSED
,SNAPPY
,GZIP
,LZO
,SDT
,PAA
,PLA
,LZ4
,ZSTD
,LZMA2
。
Relationships
relationship | 描述 |
---|---|
success | 数据能被正确的写入。 |
failure | schema 或者数据有异常。 |
QueryIoTDBRecord
这是一个用于数据读取的处理器。它通过读取 FlowFile 的内容中的SQL 查询来对IoTDB的原生接口进行访问,并将查询结果用Record Writer写入 flowfile。
QueryIoTDBRecord的配置项
配置项 | 描述 | 默认值 | 是否必填 |
---|---|---|---|
Host | IoTDB 的主机名 | null | true |
Port | IoTDB 的端口 | 6667 | true |
Username | IoTDB 的用户名 | null | true |
Password | IoTDB 的密码 | null | true |
Record Writer | 指定一个 Record Writer controller service 来写入数据。 | null | true |
iotdb-query | 需要执行的IoTDB query 。 Note: 如果有连入侧的连接那么查询会从FlowFile的内容中提取,否则使用当前配置的属性 | null | false |
iotdb-query-chunk-size | 返回的结果可以进行分块,数据流中会返回一批按设置大小切分的数据,而不是一个单一的响应. 分块查询可以返回无限量的行。 注意: 数据分块只有在设置不为0时启用 | 0 | false |
Relationships
relationship | 描述 |
---|---|
success | 数据能被正确的写入。 |
failure | schema 或者数据有异常。 |