跳至主要內容

Spark-IoTDB 用户手册

...大约 3 分钟

Spark-IoTDB 用户手册

版本支持

支持的 Spark 与 Scala 版本如下:

Spark 版本Scala 版本
2.4.0-latest2.11, 2.12

注意事项

  1. 当前版本的 spark-iotdb-connector 支持 2.112.12 两个版本的 Scala,暂不支持 2.13 版本。
  2. spark-iotdb-connector 支持在 Java、Scala 版本的 Spark 与 PySpark 中使用。

部署

spark-iotdb-connector 总共有两个使用场景,分别为 IDE 开发与 spark-shell 调试。

IDE 开发

在 IDE 开发时,只需要在 pom.xml 文件中添加以下依赖即可:

    <dependency>
      <groupId>org.apache.iotdb</groupId>
      <!-- spark-iotdb-connector_2.11 or spark-iotdb-connector_2.13 -->
      <artifactId>spark-iotdb-connector_2.12.10</artifactId>
      <version>${iotdb.version}</version>
    </dependency>

spark-shell 调试

如果需要在 spark-shell 中使用 spark-iotdb-connetcor,需要先在官网下载 with-dependencies 版本的 jar 包。然后再将 Jar 包拷贝到 ${SPARK_HOME}/jars 目录中即可。
执行以下命令即可:

cp spark-iotdb-connector_2.12.10-${iotdb.version}.jar $SPARK_HOME/jars/

使用

参数

参数描述默认值使用范围能否为空
url指定 IoTDB 的 JDBC 的 URLnullread、writefalse
userIoTDB 的用户名rootread、writetrue
passwordIoTDB 的密码rootread、writetrue
sql用于指定查询的 SQL 语句nullreadtrue
numPartition在 read 中用于指定 DataFrame 的分区数,在 write 中用于设置写入并发数1read、writetrue
lowerBound查询的起始时间戳(包含)0readtrue
upperBound查询的结束时间戳(包含)0readtrue

从 IoTDB 读取数据

以下是一个示例,演示如何从 IoTDB 中读取数据成为 DataFrame。

import org.apache.iotdb.spark.db._

val df = spark.read.format("org.apache.iotdb.spark.db")
  .option("user", "root")
  .option("password", "root")
  .option("url", "jdbc:iotdb://127.0.0.1:6667/")
  .option("sql", "select ** from root") // 查询 SQL
  .option("lowerBound", "0") // 时间戳下界
  .option("upperBound", "100000000") // 时间戳上界
  .option("numPartition", "5") // 分区数
  .load

df.printSchema()

df.show()

将数据写入 IoTDB

以下是一个示例,演示如何将数据写入 IoTDB。

// 构造窄表数据
val df = spark.createDataFrame(List(
  (1L, "root.test.d0", 1, 1L, 1.0F, 1.0D, true, "hello"),
  (2L, "root.test.d0", 2, 2L, 2.0F, 2.0D, false, "world")))

val dfWithColumn = df.withColumnRenamed("_1", "Time")
  .withColumnRenamed("_2", "Device")
  .withColumnRenamed("_3", "s0")
  .withColumnRenamed("_4", "s1")
  .withColumnRenamed("_5", "s2")
  .withColumnRenamed("_6", "s3")
  .withColumnRenamed("_7", "s4")
  .withColumnRenamed("_8", "s5")

// 写入窄表数据
dfWithColumn
  .write
  .format("org.apache.iotdb.spark.db")
  .option("url", "jdbc:iotdb://127.0.0.1:6667/")
  .save

// 构造宽表数据
val df = spark.createDataFrame(List(
  (1L, 1, 1L, 1.0F, 1.0D, true, "hello"),
  (2L, 2, 2L, 2.0F, 2.0D, false, "world")))

val dfWithColumn = df.withColumnRenamed("_1", "Time")
  .withColumnRenamed("_2", "root.test.d0.s0")
  .withColumnRenamed("_3", "root.test.d0.s1")
  .withColumnRenamed("_4", "root.test.d0.s2")
  .withColumnRenamed("_5", "root.test.d0.s3")
  .withColumnRenamed("_6", "root.test.d0.s4")
  .withColumnRenamed("_7", "root.test.d0.s5")

// 写入宽表数据
dfWithColumn.write.format("org.apache.iotdb.spark.db")
  .option("url", "jdbc:iotdb://127.0.0.1:6667/")
  .option("numPartition", "10")
  .save

宽表与窄表转换

以下是如何转换宽表与窄表的示例:

  • 从宽到窄
import org.apache.iotdb.spark.db._

val wide_df = spark.read.format("org.apache.iotdb.spark.db").option("url", "jdbc:iotdb://127.0.0.1:6667/").option("sql", "select * from root.** where time < 1100 and time > 1000").load
val narrow_df = Transformer.toNarrowForm(spark, wide_df)
  • 从窄到宽
import org.apache.iotdb.spark.db._

val wide_df = Transformer.toWideForm(spark, narrow_df)

宽表与窄表

以下 TsFile 结构为例:TsFile 模式中有三个度量:状态,温度和硬件。 这三种测量的基本信息如下:

名称类型编码
状态BooleanPLAIN
温度FloatRLE
硬件TextPLAIN

TsFile 中的现有数据如下:

  • d1:root.ln.wf01.wt01
  • d2:root.ln.wf02.wt02
timed1.statustimed1.temperaturetimed2.hardwaretimed2.status
1True12.22"aaa"1True
3True22.24"bbb"2False
5False32.16"ccc"4True

宽(默认)表形式如下:

Timeroot.ln.wf02.wt02.temperatureroot.ln.wf02.wt02.statusroot.ln.wf02.wt02.hardwareroot.ln.wf01.wt01.temperatureroot.ln.wf01.wt01.statusroot.ln.wf01.wt01.hardware
1nulltruenull2.2truenull
2nullfalseaaa2.2nullnull
3nullnullnull2.1truenull
4nulltruebbbnullnullnull
5nullnullnullnullfalsenull
6nullnullcccnullnullnull

你还可以使用窄表形式,如下所示:

TimeDevicestatushardwaretemperature
1root.ln.wf02.wt01truenull2.2
1root.ln.wf02.wt02truenullnull
2root.ln.wf02.wt01nullnull2.2
2root.ln.wf02.wt02falseaaanull
3root.ln.wf02.wt01truenull2.1
4root.ln.wf02.wt02truebbbnull
5root.ln.wf02.wt01falsenullnull
6root.ln.wf02.wt02nullcccnull

Copyright © 2024 The Apache Software Foundation.
Apache and the Apache feather logo are trademarks of The Apache Software Foundation

Have a question? Connect with us on QQ, WeChat, or Slack. Join the community now.