跳至主要內容

TsFile-Flink 连接器

大约 2 分钟

TsFile-Flink 连接器

TsFile-Flink-Connector 对 Tsfile 类型的外部数据源实现 Flink 的支持。 这使用户可以通过 Flink DataStream/DataSet 进行读取,写入和查询。

使用此连接器,您可以

  • 从本地文件系统或 hdfs 加载单个或多个 TsFile (只支持以 DataSet 的形式)到 Flink 。
  • 将本地文件系统或 hdfs 中特定目录中的所有文件加载到 Flink 中。

快速开始

TsFileInputFormat 示例

  1. 使用默认的 RowRowRecordParser 创建 TsFileInputFormat 。
String[] filedNames = {
	QueryConstant.RESERVED_TIME,
	"device_1.sensor_1",
	"device_1.sensor_2",
	"device_1.sensor_3",
	"device_2.sensor_1",
	"device_2.sensor_2",
	"device_2.sensor_3"
};
TypeInformation[] typeInformations = new TypeInformation[] {
	Types.LONG,
	Types.FLOAT,
	Types.INT,
    Types.INT,
	Types.FLOAT,
	Types.INT,
	Types.INT
};
List<Path> paths = Arrays.stream(filedNames)
	.filter(s -> !s.equals(QueryConstant.RESERVED_TIME))
	.map(Path::new)
	.collect(Collectors.toList());
RowTypeInfo rowTypeInfo = new RowTypeInfo(typeInformations, filedNames);
QueryExpression queryExpression = QueryExpression.create(paths, null);
RowRowRecordParser parser = RowRowRecordParser.create(rowTypeInfo, queryExpression.getSelectedSeries());
TsFileInputFormat inputFormat = new TsFileInputFormat<>(queryExpression, parser);
  1. 从输入格式读取数据并打印到标准输出 stdout:

DataStream:

StreamExecutionEnvironment senv = StreamExecutionEnvironment.getExecutionEnvironment();
inputFormat.setFilePath("source.tsfile");
DataStream<Row> source = senv.createInput(inputFormat);
DataStream<String> rowString = source.map(Row::toString);
Iterator<String> result = DataStreamUtils.collect(rowString);
while (result.hasNext()) {
	System.out.println(result.next());
}

DataSet:

ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
inputFormat.setFilePath("source.tsfile");
DataSet<Row> source = env.createInput(inputFormat);
List<String> result = source.map(Row::toString).collect();
for (String s : result) {
	System.out.println(s);
}

TSRecordOutputFormat 示例

  1. 使用默认的 RowTSRecordConverter 创建 TSRecordOutputFormat 。
String[] filedNames = {
	QueryConstant.RESERVED_TIME,
	"device_1.sensor_1",
	"device_1.sensor_2",
	"device_1.sensor_3",
	"device_2.sensor_1",
	"device_2.sensor_2",
	"device_2.sensor_3"
};
TypeInformation[] typeInformations = new TypeInformation[] {
    Types.LONG,
    Types.LONG,
    Types.LONG,
    Types.LONG,
    Types.LONG,
	Types.LONG,
	Types.LONG
};
RowTypeInfo rowTypeInfo = new RowTypeInfo(typeInformations, filedNames);
Schema schema = new Schema();
schema.extendTemplate("template", new MeasurementSchema("sensor_1", TSDataType.INT64, TSEncoding.TS_2DIFF));
schema.extendTemplate("template", new MeasurementSchema("sensor_2", TSDataType.INT64, TSEncoding.TS_2DIFF));
schema.extendTemplate("template", new MeasurementSchema("sensor_3", TSDataType.INT64, TSEncoding.TS_2DIFF));
RowTSRecordConverter converter = new RowTSRecordConverter(rowTypeInfo);
TSRecordOutputFormat<Row> outputFormat = new TSRecordOutputFormat<>(schema, converter);
  1. 通过输出格式写数据:

DataStream:

StreamExecutionEnvironment senv = StreamExecutionEnvironment.getExecutionEnvironment();
senv.setParallelism(1);
List<Tuple7> data = new ArrayList<>(7);
data.add(new Tuple7(1L, 2L, 3L, 4L, 5L, 6L, 7L));
data.add(new Tuple7(2L, 3L, 4L, 5L, 6L, 7L, 8L));
data.add(new Tuple7(3L, 4L, 5L, 6L, 7L, 8L, 9L));
data.add(new Tuple7(4L, 5L, 6L, 7L, 8L, 9L, 10L));
data.add(new Tuple7(6L, 6L, 7L, 8L, 9L, 10L, 11L));
data.add(new Tuple7(7L, 7L, 8L, 9L, 10L, 11L, 12L));
data.add(new Tuple7(8L, 8L, 9L, 10L, 11L, 12L, 13L));
outputFormat.setOutputFilePath(new org.apache.flink.core.fs.Path(path));
DataStream<Tuple7> source = senv.fromCollection(
	data, Types.TUPLE(Types.LONG, Types.LONG, Types.LONG, Types.LONG, Types.LONG, Types.LONG, Types.LONG));
source.map(t -> {
	Row row = new Row(7);
	for (int i = 0; i < 7; i++) {
		row.setField(i, t.getField(i));
	}
	return row;
}).returns(rowTypeInfo).writeUsingOutputFormat(outputFormat);
senv.execute();

DataSet:

ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
List<Tuple7> data = new ArrayList<>(7);
data.add(new Tuple7(1L, 2L, 3L, 4L, 5L, 6L, 7L));
data.add(new Tuple7(2L, 3L, 4L, 5L, 6L, 7L, 8L));
data.add(new Tuple7(3L, 4L, 5L, 6L, 7L, 8L, 9L));
data.add(new Tuple7(4L, 5L, 6L, 7L, 8L, 9L, 10L));
data.add(new Tuple7(6L, 6L, 7L, 8L, 9L, 10L, 11L));
data.add(new Tuple7(7L, 7L, 8L, 9L, 10L, 11L, 12L));
data.add(new Tuple7(8L, 8L, 9L, 10L, 11L, 12L, 13L));
DataSet<Tuple7> source = env.fromCollection(
	data, Types.TUPLE(Types.LONG, Types.LONG, Types.LONG, Types.LONG, Types.LONG, Types.LONG, Types.LONG));
source.map(t -> {
	Row row = new Row(7);
	for (int i = 0; i < 7; i++) {
		row.setField(i, t.getField(i));
	}
	return row;
}).returns(rowTypeInfo).write(outputFormat, path);
env.execute();

Copyright © 2024 The Apache Software Foundation.
Apache and the Apache feather logo are trademarks of The Apache Software Foundation

Have a question? Connect with us on QQ, WeChat, or Slack. Join the community now.

We use Google Analytics to collect anonymous, aggregated usage information.