Apache Flink(TsFile)
2023年7月10日大约 2 分钟
Apache Flink(TsFile)
关于 TsFile-Flink 连接器
TsFile-Flink-Connector 对 Tsfile 类型的外部数据源实现 Flink 的支持。 这使用户可以通过 Flink DataStream/DataSet 进行读取,写入和查询。
使用此连接器,您可以
- 从本地文件系统或 hdfs 加载单个或多个 TsFile (只支持以 DataSet 的形式)到 Flink 。
- 将本地文件系统或 hdfs 中特定目录中的所有文件加载到 Flink 中。
快速开始
TsFileInputFormat 示例
- 使用默认的 RowRowRecordParser 创建 TsFileInputFormat 。
String[] filedNames = {
QueryConstant.RESERVED_TIME,
"device_1.sensor_1",
"device_1.sensor_2",
"device_1.sensor_3",
"device_2.sensor_1",
"device_2.sensor_2",
"device_2.sensor_3"
};
TypeInformation[] typeInformations = new TypeInformation[] {
Types.LONG,
Types.FLOAT,
Types.INT,
Types.INT,
Types.FLOAT,
Types.INT,
Types.INT
};
List<Path> paths = Arrays.stream(filedNames)
.filter(s -> !s.equals(QueryConstant.RESERVED_TIME))
.map(Path::new)
.collect(Collectors.toList());
RowTypeInfo rowTypeInfo = new RowTypeInfo(typeInformations, filedNames);
QueryExpression queryExpression = QueryExpression.create(paths, null);
RowRowRecordParser parser = RowRowRecordParser.create(rowTypeInfo, queryExpression.getSelectedSeries());
TsFileInputFormat inputFormat = new TsFileInputFormat<>(queryExpression, parser);
- 从输入格式读取数据并打印到标准输出 stdout:
DataStream:
StreamExecutionEnvironment senv = StreamExecutionEnvironment.getExecutionEnvironment();
inputFormat.setFilePath("source.tsfile");
DataStream<Row> source = senv.createInput(inputFormat);
DataStream<String> rowString = source.map(Row::toString);
Iterator<String> result = DataStreamUtils.collect(rowString);
while (result.hasNext()) {
System.out.println(result.next());
}
DataSet:
ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
inputFormat.setFilePath("source.tsfile");
DataSet<Row> source = env.createInput(inputFormat);
List<String> result = source.map(Row::toString).collect();
for (String s : result) {
System.out.println(s);
}
TSRecordOutputFormat 示例
- 使用默认的 RowTSRecordConverter 创建 TSRecordOutputFormat 。
String[] filedNames = {
QueryConstant.RESERVED_TIME,
"device_1.sensor_1",
"device_1.sensor_2",
"device_1.sensor_3",
"device_2.sensor_1",
"device_2.sensor_2",
"device_2.sensor_3"
};
TypeInformation[] typeInformations = new TypeInformation[] {
Types.LONG,
Types.LONG,
Types.LONG,
Types.LONG,
Types.LONG,
Types.LONG,
Types.LONG
};
RowTypeInfo rowTypeInfo = new RowTypeInfo(typeInformations, filedNames);
Schema schema = new Schema();
schema.extendTemplate("template", new MeasurementSchema("sensor_1", TSDataType.INT64, TSEncoding.TS_2DIFF));
schema.extendTemplate("template", new MeasurementSchema("sensor_2", TSDataType.INT64, TSEncoding.TS_2DIFF));
schema.extendTemplate("template", new MeasurementSchema("sensor_3", TSDataType.INT64, TSEncoding.TS_2DIFF));
RowTSRecordConverter converter = new RowTSRecordConverter(rowTypeInfo);
TSRecordOutputFormat<Row> outputFormat = new TSRecordOutputFormat<>(schema, converter);
- 通过输出格式写数据:
DataStream:
StreamExecutionEnvironment senv = StreamExecutionEnvironment.getExecutionEnvironment();
senv.setParallelism(1);
List<Tuple7> data = new ArrayList<>(7);
data.add(new Tuple7(1L, 2L, 3L, 4L, 5L, 6L, 7L));
data.add(new Tuple7(2L, 3L, 4L, 5L, 6L, 7L, 8L));
data.add(new Tuple7(3L, 4L, 5L, 6L, 7L, 8L, 9L));
data.add(new Tuple7(4L, 5L, 6L, 7L, 8L, 9L, 10L));
data.add(new Tuple7(6L, 6L, 7L, 8L, 9L, 10L, 11L));
data.add(new Tuple7(7L, 7L, 8L, 9L, 10L, 11L, 12L));
data.add(new Tuple7(8L, 8L, 9L, 10L, 11L, 12L, 13L));
outputFormat.setOutputFilePath(new org.apache.flink.core.fs.Path(path));
DataStream<Tuple7> source = senv.fromCollection(
data, Types.TUPLE(Types.LONG, Types.LONG, Types.LONG, Types.LONG, Types.LONG, Types.LONG, Types.LONG));
source.map(t -> {
Row row = new Row(7);
for (int i = 0; i < 7; i++) {
row.setField(i, t.getField(i));
}
return row;
}).returns(rowTypeInfo).writeUsingOutputFormat(outputFormat);
senv.execute();
DataSet:
ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
List<Tuple7> data = new ArrayList<>(7);
data.add(new Tuple7(1L, 2L, 3L, 4L, 5L, 6L, 7L));
data.add(new Tuple7(2L, 3L, 4L, 5L, 6L, 7L, 8L));
data.add(new Tuple7(3L, 4L, 5L, 6L, 7L, 8L, 9L));
data.add(new Tuple7(4L, 5L, 6L, 7L, 8L, 9L, 10L));
data.add(new Tuple7(6L, 6L, 7L, 8L, 9L, 10L, 11L));
data.add(new Tuple7(7L, 7L, 8L, 9L, 10L, 11L, 12L));
data.add(new Tuple7(8L, 8L, 9L, 10L, 11L, 12L, 13L));
DataSet<Tuple7> source = env.fromCollection(
data, Types.TUPLE(Types.LONG, Types.LONG, Types.LONG, Types.LONG, Types.LONG, Types.LONG, Types.LONG));
source.map(t -> {
Row row = new Row(7);
for (int i = 0; i < 7; i++) {
row.setField(i, t.getField(i));
}
return row;
}).returns(rowTypeInfo).write(outputFormat, path);
env.execute();