跳至主要內容

大约 7 分钟

Spark-TsFile

About TsFile-Spark-Connector

TsFile-Spark-Connector 对 Tsfile 类型的外部数据源实现 Spark 的支持。 这使用户可以通过 Spark 读取,写入和查询 Tsfile。

使用此连接器,您可以

  • 从本地文件系统或 hdfs 加载单个 TsFile 到 Spark
  • 将本地文件系统或 hdfs 中特定目录中的所有文件加载到 Spark 中
  • 将数据从 Spark 写入 TsFile

System Requirements

Spark VersionScala VersionJava VersionTsFile
2.4.32.11.81.80.13.0-SNAPSHOT

注意:有关如何下载和使用 TsFile 的更多信息,请参见以下链接:https://github.com/apache/iotdb/tree/master/tsfileopen in new window
注意:spark 版本目前仅支持 2.4.3, 其他版本可能存在不适配的问题,目前已知 2.4.7 的版本存在不适配的问题

快速开始

本地模式

在本地模式下使用 TsFile-Spark-Connector 启动 Spark:

./<spark-shell-path>  --jars  tsfile-spark-connector.jar,tsfile-{version}-jar-with-dependencies.jar,hadoop-tsfile-{version}-jar-with-dependencies.jar

分布式模式

在分布式模式下使用 TsFile-Spark-Connector 启动 Spark(即,Spark 集群通过 spark-shell 连接):

. /<spark-shell-path>   --jars  tsfile-spark-connector.jar,tsfile-{version}-jar-with-dependencies.jar,hadoop-tsfile-{version}-jar-with-dependencies.jar  --master spark://ip:7077

注意:

数据类型对应

TsFile 数据类型SparkSQL 数据类型
BOOLEANBooleanType
INT32IntegerType
INT64LongType
FLOATFloatType
DOUBLEDoubleType
TEXTStringType

模式推断

显示 TsFile 的方式取决于架构。 以以下 TsFile 结构为例:TsFile 模式中有三个度量:状态,温度和硬件。 这三种测量的基本信息如下:

名称类型编码
状态BooleanPLAIN
温度FloatRLE
硬件TextPLAIN

TsFile 中的现有数据如下:

  • d1:root.ln.wf01.wt01
  • d2:root.ln.wf02.wt02

time|d1.status|time|d1.temperature |time | d2.hardware |time|d2.status
---- | ---- | ---- | ---- | ---- | ---- | ---- | ---- | ----
1|True |1|2.2|2|"aaa"|1|True
3|True |2|2.2|4|"bbb"|2|False
5|False|3 |2.1|6 |"ccc"|4|True

相应的 SparkSQL 表如下:

timeroot.ln.wf02.wt02.temperatureroot.ln.wf02.wt02.statusroot.ln.wf02.wt02.hardwareroot.ln.wf01.wt01.temperatureroot.ln.wf01.wt01.statusroot.ln.wf01.wt01.hardware
1nulltruenull2.2truenull
2nullfalseaaa2.2nullnull
3nullnullnull2.1truenull
4nulltruebbbnullnullnull
5nullnullnullnullfalsenull
6nullnullcccnullnullnull

您还可以使用如下所示的窄表形式:(您可以参阅第 6 部分,了解如何使用窄表形式)

timedevice_namestatushardwaretemperature
1root.ln.wf02.wt01truenull2.2
1root.ln.wf02.wt02truenullnull
2root.ln.wf02.wt01nullnull2.2
2root.ln.wf02.wt02falseaaanull
3root.ln.wf02.wt01truenull2.1
4root.ln.wf02.wt02truebbbnull
5root.ln.wf02.wt01falsenullnull
6root.ln.wf02.wt02nullcccnull

Scala API

注意:请记住预先分配必要的读写权限。

  • 示例 1:从本地文件系统读取
import org.apache.iotdb.spark.tsfile._
val wide_df = spark.read.tsfile("test.tsfile")  
wide_df.show

val narrow_df = spark.read.tsfile("test.tsfile", true)  
narrow_df.show
  • 示例 2:从 hadoop 文件系统读取
import org.apache.iotdb.spark.tsfile._
val wide_df = spark.read.tsfile("hdfs://localhost:9000/test.tsfile") 
wide_df.show

val narrow_df = spark.read.tsfile("hdfs://localhost:9000/test.tsfile", true)  
narrow_df.show
  • 示例 3:从特定目录读取
import org.apache.iotdb.spark.tsfile._
val df = spark.read.tsfile("hdfs://localhost:9000/usr/hadoop") 
df.show

注 1:现在不支持目录中所有 TsFile 的全局时间排序。

注 2:具有相同名称的度量应具有相同的架构。

  • 示例 4:广泛形式的查询
import org.apache.iotdb.spark.tsfile._
val df = spark.read.tsfile("hdfs://localhost:9000/test.tsfile") 
df.createOrReplaceTempView("tsfile_table")
val newDf = spark.sql("select * from tsfile_table where `device_1.sensor_1`>0 and `device_1.sensor_2` < 22")
newDf.show
import org.apache.iotdb.spark.tsfile._
val df = spark.read.tsfile("hdfs://localhost:9000/test.tsfile") 
df.createOrReplaceTempView("tsfile_table")
val newDf = spark.sql("select count(*) from tsfile_table")
newDf.show
  • 示例 5:缩小形式的查询
import org.apache.iotdb.spark.tsfile._
val df = spark.read.tsfile("hdfs://localhost:9000/test.tsfile", true) 
df.createOrReplaceTempView("tsfile_table")
val newDf = spark.sql("select * from tsfile_table where device_name = 'root.ln.wf02.wt02' and temperature > 5")
newDf.show
import org.apache.iotdb.spark.tsfile._
val df = spark.read.tsfile("hdfs://localhost:9000/test.tsfile", true) 
df.createOrReplaceTempView("tsfile_table")
val newDf = spark.sql("select count(*) from tsfile_table")
newDf.show
  • 例 6:写宽格式
// we only support wide_form table to write
import org.apache.iotdb.spark.tsfile._

val df = spark.read.tsfile("hdfs://localhost:9000/test.tsfile") 
df.show
df.write.tsfile("hdfs://localhost:9000/output")

val newDf = spark.read.tsfile("hdfs://localhost:9000/output")
newDf.show
  • 例 7:写窄格式
// we only support wide_form table to write
import org.apache.iotdb.spark.tsfile._

val df = spark.read.tsfile("hdfs://localhost:9000/test.tsfile", true) 
df.show
df.write.tsfile("hdfs://localhost:9000/output", true)

val newDf = spark.read.tsfile("hdfs://localhost:9000/output", true)
newDf.show

附录 A:模式推断的旧设计

显示 TsFile 的方式与 TsFile Schema 有关。 以以下 TsFile 结构为例:TsFile 架构中有三个度量:状态,温度和硬件。 这三个度量的基本信息如下:

名称类型编码
状态BooleanPLAIN
温度FloatRLE
硬件TextPLAIN

文件中的现有数据如下:

  • delta_object1: root.ln.wf01.wt01
  • delta_object2: root.ln.wf02.wt02
  • delta_object3: :root.sgcc.wf03.wt01

time|delta_object1.status|time|delta_object1.temperature |time | delta_object2.hardware |time|delta_object2.status |time|delta_object3.status|time|delta_object3.temperature
---- | ---- | ---- | ---- | ---- | ---- | ---- | ---- | ---- | ---- | ---- | ---- | ----
1|True |1|2.2|2|"aaa"|1|True|2|True|3|3.3
3|True |2|2.2|4|"bbb"|2|False|3|True|6|6.6
5|False|3 |2.1|6 |"ccc"|4|True|4|True|8|8.8
7|True|4|2.0|8|"ddd"|5|False|6|True|9|9.9

有两种显示方法:

  • 默认方式

将创建两列来存储设备的完整路径:time(LongType)和 delta_object(StringType)。

  • time:时间戳记,LongType
  • delta_object:Delta_object ID,StringType

接下来,为每个度量创建一列以存储特定数据。 SparkSQL 表结构如下:

time(LongType)delta_object(StringType)status(BooleanType)temperature(FloatType)hardware(StringType)
1root.ln.wf01.wt01True2.2null
1root.ln.wf02.wt02Truenullnull
2root.ln.wf01.wt01null2.2null
2root.ln.wf02.wt02Falsenull"aaa"
2root.sgcc.wf03.wt01Truenullnull
3root.ln.wf01.wt01True2.1null
3root.sgcc.wf03.wt01True3.3null
4root.ln.wf01.wt01null2.0null
4root.ln.wf02.wt02Truenull"bbb"
4root.sgcc.wf03.wt01Truenullnull
5root.ln.wf01.wt01Falsenullnull
5root.ln.wf02.wt02Falsenullnull
5root.sgcc.wf03.wt01Truenullnull
6root.ln.wf02.wt02nullnull"ccc"
6root.sgcc.wf03.wt01null6.6null
7root.ln.wf01.wt01Truenullnull
8root.ln.wf02.wt02nullnull"ddd"
8root.sgcc.wf03.wt01null8.8null
9root.sgcc.wf03.wt01null9.9null
  • 展开 delta_object 列

通过“。”将设备列展开为多个列,忽略根目录“root”。方便进行更丰富的聚合操作。如果用户想使用这种显示方式,需要在表创建语句中设置参数“delta_object_name”(参考本手册 5.1 节中的示例 5),在本例中,将参数“delta_object_name”设置为“root.device.turbine”。路径层的数量必须是一对一的。此时,除了“根”层之外,为设备路径的每一层创建一列。列名是参数中的名称,值是设备相应层的名称。接下来,将为每个度量创建一个列来存储特定的数据。

那么 SparkSQL 表结构如下:

time(LongType)group(StringType)field(StringType)device(StringType)status(BooleanType)temperature(FloatType)hardware(StringType)
1lnwf01wt01True2.2null
1lnwf02wt02Truenullnull
2lnwf01wt01null2.2null
2lnwf02wt02Falsenull"aaa"
2sgccwf03wt01Truenullnull
3lnwf01wt01True2.1null
3sgccwf03wt01True3.3null
4lnwf01wt01null2.0null
4lnwf02wt02Truenull"bbb"
4sgccwf03wt01Truenullnull
5lnwf01wt01Falsenullnull
5lnwf02wt02Falsenullnull
5sgccwf03wt01Truenullnull
6lnwf02wt02nullnull"ccc"
6sgccwf03wt01null6.6null
7lnwf01wt01Truenullnull
8lnwf02wt02nullnull"ddd"
8sgccwf03wt01null8.8null
9sgccwf03wt01null9.9null

TsFile-Spark-Connector 可以通过 SparkSQL 在 SparkSQL 中以表的形式显示一个或多个 tsfile。它还允许用户指定一个目录或使用通配符来匹配多个目录。如果有多个 tsfile,那么所有 tsfile 中的度量值的并集将保留在表中,并且具有相同名称的度量值在默认情况下具有相同的数据类型。注意,如果存在名称相同但数据类型不同的情况,TsFile-Spark-Connector 将不能保证结果的正确性。

写入过程是将数据 aframe 写入一个或多个 tsfile。默认情况下,需要包含两个列:time 和 delta_object。其余的列用作测量。如果用户希望将第二个表结构写回 TsFile,可以设置“delta_object_name”参数(请参阅本手册 5.1 节的 5.1 节)。

附录 B:旧注

注意:检查 Spark 根目录中的 jar 软件包,并将 libthrift-0.9.2.jar 和 libfb303-0.9.2.jar 分别替换为 libthrift-0.9.1.jar 和 libfb303-0.9.1.jar。

Copyright © 2024 The Apache Software Foundation.
Apache and the Apache feather logo are trademarks of The Apache Software Foundation

Have a question? Connect with us on QQ, WeChat, or Slack. Join the community now.