跳至主要內容

Spark-TsFile

...大约 7 分钟

Spark-TsFile

About TsFile-Spark-Connector

TsFile-Spark-Connector 对 Tsfile 类型的外部数据源实现 Spark 的支持。 这使用户可以通过 Spark 读取,写入和查询 Tsfile。

使用此连接器,您可以

  • 从本地文件系统或 hdfs 加载单个 TsFile 到 Spark
  • 将本地文件系统或 hdfs 中特定目录中的所有文件加载到 Spark 中
  • 将数据从 Spark 写入 TsFile

System Requirements

Spark VersionScala VersionJava VersionTsFile
2.4.32.11.81.81.0.0

注意:有关如何下载和使用 TsFile 的更多信息,请参见以下链接:https://github.com/apache/iotdb/tree/master/tsfileopen in new window
注意:spark 版本目前仅支持 2.4.3, 其他版本可能存在不适配的问题,目前已知 2.4.7 的版本存在不适配的问题

快速开始

本地模式

在本地模式下使用 TsFile-Spark-Connector 启动 Spark:

./<spark-shell-path>  --jars  tsfile-spark-connector.jar,tsfile-{version}-jar-with-dependencies.jar,hadoop-tsfile-{version}-jar-with-dependencies.jar

分布式模式

在分布式模式下使用 TsFile-Spark-Connector 启动 Spark(即,Spark 集群通过 spark-shell 连接):

. /<spark-shell-path>   --jars  tsfile-spark-connector.jar,tsfile-{version}-jar-with-dependencies.jar,hadoop-tsfile-{version}-jar-with-dependencies.jar  --master spark://ip:7077

注意:

数据类型对应

TsFile 数据类型SparkSQL 数据类型
BOOLEANBooleanType
INT32IntegerType
INT64LongType
FLOATFloatType
DOUBLEDoubleType
TEXTStringType

模式推断

显示 TsFile 的方式取决于架构。 以以下 TsFile 结构为例:TsFile 模式中有三个度量:状态,温度和硬件。 这三种测量的基本信息如下:

名称类型编码
状态BooleanPLAIN
温度FloatRLE
硬件TextPLAIN

TsFile 中的现有数据如下:

  • d1:root.ln.wf01.wt01
  • d2:root.ln.wf02.wt02
timed1.statustimed1.temperaturetimed2.hardwaretimed2.status
1True12.22"aaa"1True
3True22.24"bbb"2False
5False32.16"ccc"4True

相应的 SparkSQL 表如下:

timeroot.ln.wf02.wt02.temperatureroot.ln.wf02.wt02.statusroot.ln.wf02.wt02.hardwareroot.ln.wf01.wt01.temperatureroot.ln.wf01.wt01.statusroot.ln.wf01.wt01.hardware
1nulltruenull2.2truenull
2nullfalseaaa2.2nullnull
3nullnullnull2.1truenull
4nulltruebbbnullnullnull
5nullnullnullnullfalsenull
6nullnullcccnullnullnull

您还可以使用如下所示的窄表形式:(您可以参阅第 6 部分,了解如何使用窄表形式)

timedevice_namestatushardwaretemperature
1root.ln.wf02.wt01truenull2.2
1root.ln.wf02.wt02truenullnull
2root.ln.wf02.wt01nullnull2.2
2root.ln.wf02.wt02falseaaanull
3root.ln.wf02.wt01truenull2.1
4root.ln.wf02.wt02truebbbnull
5root.ln.wf02.wt01falsenullnull
6root.ln.wf02.wt02nullcccnull

Scala API

注意:请记住预先分配必要的读写权限。

  • 示例 1:从本地文件系统读取
import org.apache.iotdb.spark.tsfile._
val wide_df = spark.read.tsfile("test.tsfile")  
wide_df.show

val narrow_df = spark.read.tsfile("test.tsfile", true)  
narrow_df.show
  • 示例 2:从 hadoop 文件系统读取
import org.apache.iotdb.spark.tsfile._
val wide_df = spark.read.tsfile("hdfs://localhost:9000/test.tsfile") 
wide_df.show

val narrow_df = spark.read.tsfile("hdfs://localhost:9000/test.tsfile", true)  
narrow_df.show
  • 示例 3:从特定目录读取
import org.apache.iotdb.spark.tsfile._
val df = spark.read.tsfile("hdfs://localhost:9000/usr/hadoop") 
df.show

注 1:现在不支持目录中所有 TsFile 的全局时间排序。

注 2:具有相同名称的度量应具有相同的架构。

  • 示例 4:广泛形式的查询
import org.apache.iotdb.spark.tsfile._
val df = spark.read.tsfile("hdfs://localhost:9000/test.tsfile") 
df.createOrReplaceTempView("tsfile_table")
val newDf = spark.sql("select * from tsfile_table where `device_1.sensor_1`>0 and `device_1.sensor_2` < 22")
newDf.show
import org.apache.iotdb.spark.tsfile._
val df = spark.read.tsfile("hdfs://localhost:9000/test.tsfile") 
df.createOrReplaceTempView("tsfile_table")
val newDf = spark.sql("select count(*) from tsfile_table")
newDf.show
  • 示例 5:缩小形式的查询
import org.apache.iotdb.spark.tsfile._
val df = spark.read.tsfile("hdfs://localhost:9000/test.tsfile", true) 
df.createOrReplaceTempView("tsfile_table")
val newDf = spark.sql("select * from tsfile_table where device_name = 'root.ln.wf02.wt02' and temperature > 5")
newDf.show
import org.apache.iotdb.spark.tsfile._
val df = spark.read.tsfile("hdfs://localhost:9000/test.tsfile", true) 
df.createOrReplaceTempView("tsfile_table")
val newDf = spark.sql("select count(*) from tsfile_table")
newDf.show
  • 例 6:写宽格式
// we only support wide_form table to write
import org.apache.iotdb.spark.tsfile._

val df = spark.read.tsfile("hdfs://localhost:9000/test.tsfile") 
df.show
df.write.tsfile("hdfs://localhost:9000/output")

val newDf = spark.read.tsfile("hdfs://localhost:9000/output")
newDf.show
  • 例 7:写窄格式
// we only support wide_form table to write
import org.apache.iotdb.spark.tsfile._

val df = spark.read.tsfile("hdfs://localhost:9000/test.tsfile", true) 
df.show
df.write.tsfile("hdfs://localhost:9000/output", true)

val newDf = spark.read.tsfile("hdfs://localhost:9000/output", true)
newDf.show

附录 A:模式推断的旧设计

显示 TsFile 的方式与 TsFile Schema 有关。 以以下 TsFile 结构为例:TsFile 架构中有三个度量:状态,温度和硬件。 这三个度量的基本信息如下:

名称类型编码
状态BooleanPLAIN
温度FloatRLE
硬件TextPLAIN

文件中的现有数据如下:

  • delta_object1: root.ln.wf01.wt01
  • delta_object2: root.ln.wf02.wt02
  • delta_object3: :root.sgcc.wf03.wt01
timedelta_object1.statustimedelta_object1.temperaturetimedelta_object2.hardwaretimedelta_object2.statustimedelta_object3.statustimedelta_object3.temperature
1True12.22"aaa"1True2True33.3
3True22.24"bbb"2False3True66.6
5False32.16"ccc"4True4True88.8
7True42.08"ddd"5False6True99.9

有两种显示方法:

  • 默认方式

将创建两列来存储设备的完整路径:time(LongType)和 delta_object(StringType)。

  • time:时间戳记,LongType
  • delta_object:Delta_object ID,StringType

接下来,为每个度量创建一列以存储特定数据。 SparkSQL 表结构如下:

time(LongType)delta_object(StringType)status(BooleanType)temperature(FloatType)hardware(StringType)
1root.ln.wf01.wt01True2.2null
1root.ln.wf02.wt02Truenullnull
2root.ln.wf01.wt01null2.2null
2root.ln.wf02.wt02Falsenull"aaa"
2root.sgcc.wf03.wt01Truenullnull
3root.ln.wf01.wt01True2.1null
3root.sgcc.wf03.wt01True3.3null
4root.ln.wf01.wt01null2.0null
4root.ln.wf02.wt02Truenull"bbb"
4root.sgcc.wf03.wt01Truenullnull
5root.ln.wf01.wt01Falsenullnull
5root.ln.wf02.wt02Falsenullnull
5root.sgcc.wf03.wt01Truenullnull
6root.ln.wf02.wt02nullnull"ccc"
6root.sgcc.wf03.wt01null6.6null
7root.ln.wf01.wt01Truenullnull
8root.ln.wf02.wt02nullnull"ddd"
8root.sgcc.wf03.wt01null8.8null
9root.sgcc.wf03.wt01null9.9null
  • 展开 delta_object 列

通过“。”将设备列展开为多个列,忽略根目录“root”。方便进行更丰富的聚合操作。如果用户想使用这种显示方式,需要在表创建语句中设置参数“delta_object_name”(参考本手册 5.1 节中的示例 5),在本例中,将参数“delta_object_name”设置为“root.device.turbine”。路径层的数量必须是一对一的。此时,除了“根”层之外,为设备路径的每一层创建一列。列名是参数中的名称,值是设备相应层的名称。接下来,将为每个度量创建一个列来存储特定的数据。

那么 SparkSQL 表结构如下:

time(LongType)group(StringType)field(StringType)device(StringType)status(BooleanType)temperature(FloatType)hardware(StringType)
1lnwf01wt01True2.2null
1lnwf02wt02Truenullnull
2lnwf01wt01null2.2null
2lnwf02wt02Falsenull"aaa"
2sgccwf03wt01Truenullnull
3lnwf01wt01True2.1null
3sgccwf03wt01True3.3null
4lnwf01wt01null2.0null
4lnwf02wt02Truenull"bbb"
4sgccwf03wt01Truenullnull
5lnwf01wt01Falsenullnull
5lnwf02wt02Falsenullnull
5sgccwf03wt01Truenullnull
6lnwf02wt02nullnull"ccc"
6sgccwf03wt01null6.6null
7lnwf01wt01Truenullnull
8lnwf02wt02nullnull"ddd"
8sgccwf03wt01null8.8null
9sgccwf03wt01null9.9null

TsFile-Spark-Connector 可以通过 SparkSQL 在 SparkSQL 中以表的形式显示一个或多个 tsfile。它还允许用户指定一个目录或使用通配符来匹配多个目录。如果有多个 tsfile,那么所有 tsfile 中的度量值的并集将保留在表中,并且具有相同名称的度量值在默认情况下具有相同的数据类型。注意,如果存在名称相同但数据类型不同的情况,TsFile-Spark-Connector 将不能保证结果的正确性。

写入过程是将数据 aframe 写入一个或多个 tsfile。默认情况下,需要包含两个列:time 和 delta_object。其余的列用作测量。如果用户希望将第二个表结构写回 TsFile,可以设置“delta_object_name”参数(请参阅本手册 5.1 节的 5.1 节)。

附录 B:旧注

注意:检查 Spark 根目录中的 jar 软件包,并将 libthrift-0.9.2.jar 和 libfb303-0.9.2.jar 分别替换为 libthrift-0.9.1.jar 和 libfb303-0.9.1.jar。

Copyright © 2024 The Apache Software Foundation.
Apache IoTDB, IoTDB, Apache, the Apache feather logo, and the Apache IoTDB project logo are either registered trademarks or trademarks of The Apache Software Foundation in all countries

Have a question? Connect with us on QQ, WeChat, or Slack. Join the community now.