大约 4 分钟

Hadoop-TsFile

TsFile 的 Hadoop 连接器实现了对 Hadoop 读取外部 Tsfile 类型的文件格式的支持。让用户可以使用 Hadoop 的 map、reduce 等操作对 Tsfile 文件进行读取、写入和查询。

有了这个连接器,用户可以

  • 将单个 Tsfile 文件加载进 Hadoop,不论文件是存储在本地文件系统或者是 HDFS 中
  • 将某个特定目录下的所有文件加载进 Hadoop,不论文件是存储在本地文件系统或者是 HDFS 中
  • 将 Hadoop 处理完后的结果以 Tsfile 的格式保存

系统环境要求

Hadoop 版本Java 版本TsFile 版本
2.7.31.80.13.0-SNAPSHOT+

注意:关于如何下载和使用 Tsfile, 请参考以下链接:https://github.com/apache/iotdb/tree/master/tsfileopen in new window.

数据类型对应关系

TsFile 数据类型Hadoop writable
BOOLEANBooleanWritable
INT32IntWritable
INT64LongWritable
FLOATFloatWritable
DOUBLEDoubleWritable
TEXTText

关于 TSFInputFormat 的说明

TSFInputFormat 继承了 Hadoop 中 FileInputFormat 类,重写了其中切片的方法。

目前的切片方法是根据每个 ChunkGroup 的中点的 offset 是否属于 Hadoop 所切片的 startOffset 和 endOffset 之间,来判断是否将该 ChunkGroup 放入此切片。

TSFInputFormat 将 tsfile 中的数据以多个MapWritable记录的形式返回给用户。

假设我们想要从 Tsfile 中获得名为d1的设备的数据,该设备有三个传感器,名称分别为s1, s2, s3

s1的类型是BOOLEAN, s2的类型是 DOUBLE, s3的类型是TEXT.

MapWritable的结构如下所示:

{
    "time_stamp": 10000000,
    "device_id":  d1,
    "s1":         true,
    "s2":         3.14,
    "s3":         "middle"
}

在 Hadoop 的 Map job 中,你可以采用如下方法获得你想要的任何值

mapwritable.get(new Text("s1"))

注意:MapWritable中所有的键值类型都是Text

使用示例

读示例:求和

首先,我们需要在 TSFInputFormat 中配置我们需要哪些数据

// configure reading time enable
TSFInputFormat.setReadTime(job, true); 
// configure reading deviceId enable
TSFInputFormat.setReadDeviceId(job, true); 
// configure reading which deltaObjectIds
String[] deviceIds = {"device_1"};
TSFInputFormat.setReadDeviceIds(job, deltaObjectIds);
// configure reading which measurementIds
String[] measurementIds = {"sensor_1", "sensor_2", "sensor_3"};
TSFInputFormat.setReadMeasurementIds(job, measurementIds);

然后,必须指定 mapper 和 reducer 输出的键和值类型

// set inputformat and outputformat
job.setInputFormatClass(TSFInputFormat.class);
// set mapper output key and value
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(DoubleWritable.class);
// set reducer output key and value
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(DoubleWritable.class);

接着,就可以编写包含具体的处理数据逻辑的mapperreducer类了。

public static class TSMapper extends Mapper<NullWritable, MapWritable, Text, DoubleWritable> {

	@Override
	protected void map(NullWritable key, MapWritable value,
	    Mapper<NullWritable, MapWritable, Text, DoubleWritable>.Context context)
	    throws IOException, InterruptedException {
	
	  Text deltaObjectId = (Text) value.get(new Text("device_id"));
	  context.write(deltaObjectId, (DoubleWritable) value.get(new Text("sensor_3")));
	}
}

public static class TSReducer extends Reducer<Text, DoubleWritable, Text, DoubleWritable> {

	@Override
	protected void reduce(Text key, Iterable<DoubleWritable> values,
	    Reducer<Text, DoubleWritable, Text, DoubleWritable>.Context context)
	    throws IOException, InterruptedException {
	
	  double sum = 0;
	  for (DoubleWritable value : values) {
	    sum = sum + value.get();
	  }
	  context.write(key, new DoubleWritable(sum));
	}
}

注意:完整的代码示例可以在如下链接中找到:https://github.com/apache/iotdb/blob/master/example/hadoop/src/main/java/org/apache/iotdb/hadoop/tsfile/TSFMRReadExample.javaopen in new window

写示例:计算平均数并写入 Tsfile 中

除了OutputFormatClass,剩下的配置代码跟上面的读示例是一样的

job.setOutputFormatClass(TSFOutputFormat.class);
// set reducer output key and value
job.setOutputKeyClass(NullWritable.class);
job.setOutputValueClass(HDFSTSRecord.class);

然后,是包含具体的处理数据逻辑的mapperreducer类。

public static class TSMapper extends Mapper<NullWritable, MapWritable, Text, MapWritable> {

    @Override
    protected void map(NullWritable key, MapWritable value,
                       Mapper<NullWritable, MapWritable, Text, MapWritable>.Context context)
            throws IOException, InterruptedException {

        Text deltaObjectId = (Text) value.get(new Text("device_id"));
        long timestamp = ((LongWritable)value.get(new Text("timestamp"))).get();
        if (timestamp % 100000 == 0) {
            context.write(deltaObjectId, new MapWritable(value));
        }
    }
}

/**
 * This reducer calculate the average value.
 */
public static class TSReducer extends Reducer<Text, MapWritable, NullWritable, HDFSTSRecord> {

    @Override
    protected void reduce(Text key, Iterable<MapWritable> values,
                          Reducer<Text, MapWritable, NullWritable, HDFSTSRecord>.Context context) throws IOException, InterruptedException {
        long sensor1_value_sum = 0;
        long sensor2_value_sum = 0;
        double sensor3_value_sum = 0;
        long num = 0;
        for (MapWritable value : values) {
            num++;
            sensor1_value_sum += ((LongWritable)value.get(new Text("sensor_1"))).get();
            sensor2_value_sum += ((LongWritable)value.get(new Text("sensor_2"))).get();
            sensor3_value_sum += ((DoubleWritable)value.get(new Text("sensor_3"))).get();
        }
        HDFSTSRecord tsRecord = new HDFSTSRecord(1L, key.toString());
        DataPoint dPoint1 = new LongDataPoint("sensor_1", sensor1_value_sum / num);
        DataPoint dPoint2 = new LongDataPoint("sensor_2", sensor2_value_sum / num);
        DataPoint dPoint3 = new DoubleDataPoint("sensor_3", sensor3_value_sum / num);
        tsRecord.addTuple(dPoint1);
        tsRecord.addTuple(dPoint2);
        tsRecord.addTuple(dPoint3);
        context.write(NullWritable.get(), tsRecord);
    }
}

注意:完整的代码示例可以在如下链接中找到:https://github.com/apache/iotdb/blob/master/example/hadoop/src/main/java/org/apache/iotdb/hadoop/tsfile/TSMRWriteExample.javaopen in new window

Copyright © 2023 The Apache Software Foundation.
Apache and the Apache feather logo are trademarks of The Apache Software Foundation

Have a question? Connect with us on QQ, WeChat, or Slack. Join the community now.

We use Google Analytics to collect anonymous, aggregated usage information.