跳至主要內容

Spark-IoTDB 用户手册

...大约 3 分钟

Spark-IoTDB 用户手册

版本支持

支持的 Spark 与 Scala 版本如下:

Spark 版本Scala 版本
2.4.0-latest2.11, 2.12

注意事项

  1. 当前版本的 spark-iotdb-connector 支持 2.112.12 两个版本的 Scala,暂不支持 2.13 版本。
  2. spark-iotdb-connector 支持在 Java、Scala 版本的 Spark 与 PySpark 中使用。

部署

spark-iotdb-connector 总共有两个使用场景,分别为 IDE 开发与 spark-shell 调试。

IDE 开发

在 IDE 开发时,只需要在 pom.xml 文件中添加以下依赖即可:

    <dependency>
      <groupId>org.apache.iotdb</groupId>
      <!-- spark-iotdb-connector_2.11 or spark-iotdb-connector_2.13 -->
      <artifactId>spark-iotdb-connector_2.12.10</artifactId>
      <version>${iotdb.version}</version>
    </dependency>

spark-shell 调试

如果需要在 spark-shell 中使用 spark-iotdb-connetcor,需要先在官网下载 with-dependencies 版本的 jar 包。然后再将 Jar 包拷贝到 ${SPARK_HOME}/jars 目录中即可。
执行以下命令即可:

cp spark-iotdb-connector_2.12.10-${iotdb.version}.jar $SPARK_HOME/jars/

使用

参数

参数描述默认值使用范围能否为空
url指定 IoTDB 的 JDBC 的 URLnullread、writefalse
userIoTDB 的用户名rootread、writetrue
passwordIoTDB 的密码rootread、writetrue
sql用于指定查询的 SQL 语句nullreadtrue
numPartition在 read 中用于指定 DataFrame 的分区数,在 write 中用于设置写入并发数1read、writetrue
lowerBound查询的起始时间戳(包含)0readtrue
upperBound查询的结束时间戳(包含)0readtrue

从 IoTDB 读取数据

以下是一个示例,演示如何从 IoTDB 中读取数据成为 DataFrame。

import org.apache.iotdb.spark.db._

val df = spark.read.format("org.apache.iotdb.spark.db")
  .option("user", "root")
  .option("password", "root")
  .option("url", "jdbc:iotdb://127.0.0.1:6667/")
  .option("sql", "select ** from root") // 查询 SQL
  .option("lowerBound", "0") // 时间戳下界
  .option("upperBound", "100000000") // 时间戳上界
  .option("numPartition", "5") // 分区数
  .load

df.printSchema()

df.show()

将数据写入 IoTDB

以下是一个示例,演示如何将数据写入 IoTDB。

// 构造窄表数据
val df = spark.createDataFrame(List(
  (1L, "root.test.d0", 1, 1L, 1.0F, 1.0D, true, "hello"),
  (2L, "root.test.d0", 2, 2L, 2.0F, 2.0D, false, "world")))

val dfWithColumn = df.withColumnRenamed("_1", "Time")
  .withColumnRenamed("_2", "Device")
  .withColumnRenamed("_3", "s0")
  .withColumnRenamed("_4", "s1")
  .withColumnRenamed("_5", "s2")
  .withColumnRenamed("_6", "s3")
  .withColumnRenamed("_7", "s4")
  .withColumnRenamed("_8", "s5")

// 写入窄表数据
dfWithColumn
  .write
  .format("org.apache.iotdb.spark.db")
  .option("url", "jdbc:iotdb://127.0.0.1:6667/")
  .save

// 构造宽表数据
val df = spark.createDataFrame(List(
  (1L, 1, 1L, 1.0F, 1.0D, true, "hello"),
  (2L, 2, 2L, 2.0F, 2.0D, false, "world")))

val dfWithColumn = df.withColumnRenamed("_1", "Time")
  .withColumnRenamed("_2", "root.test.d0.s0")
  .withColumnRenamed("_3", "root.test.d0.s1")
  .withColumnRenamed("_4", "root.test.d0.s2")
  .withColumnRenamed("_5", "root.test.d0.s3")
  .withColumnRenamed("_6", "root.test.d0.s4")
  .withColumnRenamed("_7", "root.test.d0.s5")

// 写入宽表数据
dfWithColumn.write.format("org.apache.iotdb.spark.db")
  .option("url", "jdbc:iotdb://127.0.0.1:6667/")
  .option("numPartition", "10")
  .save

宽表与窄表转换

以下是如何转换宽表与窄表的示例:

  • 从宽到窄
import org.apache.iotdb.spark.db._

val wide_df = spark.read.format("org.apache.iotdb.spark.db").option("url", "jdbc:iotdb://127.0.0.1:6667/").option("sql", "select * from root.** where time < 1100 and time > 1000").load
val narrow_df = Transformer.toNarrowForm(spark, wide_df)
  • 从窄到宽
import org.apache.iotdb.spark.db._

val wide_df = Transformer.toWideForm(spark, narrow_df)

宽表与窄表

以下 TsFile 结构为例:TsFile 模式中有三个度量:状态,温度和硬件。 这三种测量的基本信息如下:

名称类型编码
状态BooleanPLAIN
温度FloatRLE
硬件TextPLAIN

TsFile 中的现有数据如下:

  • d1:root.ln.wf01.wt01
  • d2:root.ln.wf02.wt02
timed1.statustimed1.temperaturetimed2.hardwaretimed2.status
1True12.22"aaa"1True
3True22.24"bbb"2False
5False32.16"ccc"4True

宽(默认)表形式如下:

Timeroot.ln.wf02.wt02.temperatureroot.ln.wf02.wt02.statusroot.ln.wf02.wt02.hardwareroot.ln.wf01.wt01.temperatureroot.ln.wf01.wt01.statusroot.ln.wf01.wt01.hardware
1nulltruenull2.2truenull
2nullfalseaaa2.2nullnull
3nullnullnull2.1truenull
4nulltruebbbnullnullnull
5nullnullnullnullfalsenull
6nullnullcccnullnullnull

你还可以使用窄表形式,如下所示:

TimeDevicestatushardwaretemperature
1root.ln.wf02.wt01truenull2.2
1root.ln.wf02.wt02truenullnull
2root.ln.wf02.wt01nullnull2.2
2root.ln.wf02.wt02falseaaanull
3root.ln.wf02.wt01truenull2.1
4root.ln.wf02.wt02truebbbnull
5root.ln.wf02.wt01falsenullnull
6root.ln.wf02.wt02nullcccnull

Copyright © 2024 The Apache Software Foundation.
Apache IoTDB, IoTDB, Apache, the Apache feather logo, and the Apache IoTDB project logo are either registered trademarks or trademarks of The Apache Software Foundation in all countries

Have a question? Connect with us on QQ, WeChat, or Slack. Join the community now.