Skip to main content

Flink-TsFile-Connector

...About 2 min

Flink-TsFile-Connector

Flink-TsFile-Connector implements the support of Flink for external data sources of Tsfile type.
This enables users to read and write Tsfile by Flink via DataStream/DataSet API.

With this connector, you can

  • load a single TsFile or multiple TsFiles(only for DataSet), from either the local file system or hdfs, into Flink
  • load all files in a specific directory, from either the local file system or hdfs, into Flink

Quick Start

TsFileInputFormat Example

  1. create TsFileInputFormat with default RowRowRecordParser.
String[] filedNames = {
	QueryConstant.RESERVED_TIME,
	"device_1.sensor_1",
	"device_1.sensor_2",
	"device_1.sensor_3",
	"device_2.sensor_1",
	"device_2.sensor_2",
	"device_2.sensor_3"
};
TypeInformation[] typeInformations = new TypeInformation[] {
	Types.LONG,
	Types.FLOAT,
	Types.INT,
	Types.INT,
	Types.FLOAT,
	Types.INT,
	Types.INT
};
List<Path> paths = Arrays.stream(filedNames)
	.filter(s -> !s.equals(QueryConstant.RESERVED_TIME))
	.map(Path::new)
	.collect(Collectors.toList());
RowTypeInfo rowTypeInfo = new RowTypeInfo(typeInformations, filedNames);
QueryExpression queryExpression = QueryExpression.create(paths, null);
RowRowRecordParser parser = RowRowRecordParser.create(rowTypeInfo, queryExpression.getSelectedSeries());
TsFileInputFormat inputFormat = new TsFileInputFormat<>(queryExpression, parser);
  1. Read data from the input format and print to stdout:

DataStream:

StreamExecutionEnvironment senv = StreamExecutionEnvironment.getExecutionEnvironment();
inputFormat.setFilePath("source.tsfile");
DataStream<Row> source = senv.createInput(inputFormat);
DataStream<String> rowString = source.map(Row::toString);
Iterator<String> result = DataStreamUtils.collect(rowString);
while (result.hasNext()) {
	System.out.println(result.next());
}

DataSet:

ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
inputFormat.setFilePath("source.tsfile");
DataSet<Row> source = env.createInput(inputFormat);
List<String> result = source.map(Row::toString).collect();
for (String s : result) {
	System.out.println(s);
}

Example of TSRecordOutputFormat

  1. create TSRecordOutputFormat with default RowTSRecordConverter.
String[] filedNames = {
	QueryConstant.RESERVED_TIME,
	"device_1.sensor_1",
	"device_1.sensor_2",
	"device_1.sensor_3",
	"device_2.sensor_1",
	"device_2.sensor_2",
	"device_2.sensor_3"
};
TypeInformation[] typeInformations = new TypeInformation[] {
	Types.LONG,
	Types.LONG,
	Types.LONG,
	Types.LONG,
	Types.LONG,
	Types.LONG,
	Types.LONG
};
RowTypeInfo rowTypeInfo = new RowTypeInfo(typeInformations, filedNames);
Schema schema = new Schema();
schema.extendTemplate("template", new MeasurementSchema("sensor_1", TSDataType.INT64, TSEncoding.TS_2DIFF));
schema.extendTemplate("template", new MeasurementSchema("sensor_2", TSDataType.INT64, TSEncoding.TS_2DIFF));
schema.extendTemplate("template", new MeasurementSchema("sensor_3", TSDataType.INT64, TSEncoding.TS_2DIFF));
RowTSRecordConverter converter = new RowTSRecordConverter(rowTypeInfo);
TSRecordOutputFormat<Row> outputFormat = new TSRecordOutputFormat<>(schema, converter);
  1. write data via the output format:

DataStream:

StreamExecutionEnvironment senv = StreamExecutionEnvironment.getExecutionEnvironment();
senv.setParallelism(1);
List<Tuple7> data = new ArrayList<>(7);
data.add(new Tuple7(1L, 2L, 3L, 4L, 5L, 6L, 7L));
data.add(new Tuple7(2L, 3L, 4L, 5L, 6L, 7L, 8L));
data.add(new Tuple7(3L, 4L, 5L, 6L, 7L, 8L, 9L));
data.add(new Tuple7(4L, 5L, 6L, 7L, 8L, 9L, 10L));
data.add(new Tuple7(6L, 6L, 7L, 8L, 9L, 10L, 11L));
data.add(new Tuple7(7L, 7L, 8L, 9L, 10L, 11L, 12L));
data.add(new Tuple7(8L, 8L, 9L, 10L, 11L, 12L, 13L));
outputFormat.setOutputFilePath(new org.apache.flink.core.fs.Path(path));
DataStream<Tuple7> source = senv.fromCollection(
	data, Types.TUPLE(Types.LONG, Types.LONG, Types.LONG, Types.LONG, Types.LONG, Types.LONG, Types.LONG));
source.map(t -> {
	Row row = new Row(7);
	for (int i = 0; i < 7; i++) {
		row.setField(i, t.getField(i));
	}
	return row;
}).returns(rowTypeInfo).writeUsingOutputFormat(outputFormat);
senv.execute();

DataSet:

ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
List<Tuple7> data = new ArrayList<>(7);
data.add(new Tuple7(1L, 2L, 3L, 4L, 5L, 6L, 7L));
data.add(new Tuple7(2L, 3L, 4L, 5L, 6L, 7L, 8L));
data.add(new Tuple7(3L, 4L, 5L, 6L, 7L, 8L, 9L));
data.add(new Tuple7(4L, 5L, 6L, 7L, 8L, 9L, 10L));
data.add(new Tuple7(6L, 6L, 7L, 8L, 9L, 10L, 11L));
data.add(new Tuple7(7L, 7L, 8L, 9L, 10L, 11L, 12L));
data.add(new Tuple7(8L, 8L, 9L, 10L, 11L, 12L, 13L));
DataSet<Tuple7> source = env.fromCollection(
	data, Types.TUPLE(Types.LONG, Types.LONG, Types.LONG, Types.LONG, Types.LONG, Types.LONG, Types.LONG));
source.map(t -> {
	Row row = new Row(7);
	for (int i = 0; i < 7; i++) {
		row.setField(i, t.getField(i));
	}
	return row;
}).returns(rowTypeInfo).write(outputFormat, path);
env.execute();

Copyright © 2024 The Apache Software Foundation.
Apache IoTDB, IoTDB, Apache, the Apache feather logo, and the Apache IoTDB project logo are either registered trademarks or trademarks of The Apache Software Foundation in all countries

Have a question? Connect with us on QQ, WeChat, or Slack. Join the community now.