跳至主要內容

...大约 3 分钟

Spark-IoTDB

版本

Spark和Java所需的版本如下:

Spark VersionScala VersionJava VersionTsFile
2.4.0-3.2.02.121.80.13.0

注意

  1. Spark IoTDB Connector只支持Spark 2.4.0Spark 3.2.0Scala 2.12版本。
    如果需要对其他版本进行支持,可以通过修改源码中spark-iotdb-connector这个模块里面pom文件的Scala版本之后进行重新编译。

  2. 因为IoTDB与Spark的thrift版本有冲突,所以需要通过执行rm -f $SPARK_HOME/jars/libthrift*cp $IOTDB_HOME/lib/libthrift* $SPARK_HOME/jars/这两个命令来解决。
    否则的话,就只能在IDE里面进行代码调试。而且如果你需要通过spark-submit命令提交任务的话,你打包时必须要带上依赖。

安装

mvn clean scala:compile compile install

Maven依赖

    <dependency>
      <groupId>org.apache.iotdb</groupId>
      <artifactId>spark-iotdb-connector</artifactId>
      <version>0.13.0</version>
    </dependency>

Spark-shell用户指南

spark-shell --jars spark-iotdb-connector-0.13.0.jar,iotdb-jdbc-0.13.0-jar-with-dependencies.jar

import org.apache.iotdb.spark.db._

val df = spark.read.format("org.apache.iotdb.spark.db").option("url","jdbc:iotdb://127.0.0.1:6667/").option("sql","select * from root").load

df.printSchema()

df.show()

如果要对rdd进行分区,可以执行以下操作

spark-shell --jars spark-iotdb-connector-0.13.0.jar,iotdb-jdbc-0.13.0-jar-with-dependencies.jar

import org.apache.iotdb.spark.db._

val df = spark.read.format("org.apache.iotdb.spark.db").option("url","jdbc:iotdb://127.0.0.1:6667/").option("sql","select * from root").
                        option("lowerBound", [lower bound of time that you want query(include)]).option("upperBound", [upper bound of time that you want query(include)]).
                        option("numPartition", [the partition number you want]).load

df.printSchema()

df.show()

模式推断

以下TsFile结构为例:TsFile模式中有三个度量:状态,温度和硬件。 这三种测量的基本信息如下:

名称类型编码
状态BooleanPLAIN
温度FloatRLE
硬件TextPLAIN

TsFile中的现有数据如下:

  • d1:root.ln.wf01.wt01
  • d2:root.ln.wf02.wt02
timed1.statustimed1.temperaturetimed2.hardwaretimed2.status
1True12.22"aaa"1True
3True22.24"bbb"2False
5False32.16"ccc"4True

宽(默认)表形式如下:

timeroot.ln.wf02.wt02.temperatureroot.ln.wf02.wt02.statusroot.ln.wf02.wt02.hardwareroot.ln.wf01.wt01.temperatureroot.ln.wf01.wt01.statusroot.ln.wf01.wt01.hardware
1nulltruenull2.2truenull
2nullfalseaaa2.2nullnull
3nullnullnull2.1truenull
4nulltruebbbnullnullnull
5nullnullnullnullfalsenull
6nullnullcccnullnullnull

你还可以使用窄表形式,如下所示:(您可以参阅第4部分,了解如何使用窄表形式)

时间设备名状态硬件温度
1root.ln.wf02.wt01truenull2.2
1root.ln.wf02.wt02truenullnull
2root.ln.wf02.wt01nullnull2.2
2root.ln.wf02.wt02falseaaanull
3root.ln.wf02.wt01truenull2.1
4root.ln.wf02.wt02truebbbnull
5root.ln.wf02.wt01falsenullnull
6root.ln.wf02.wt02nullcccnull

在宽和窄表之间转换

  • 从宽到窄
import org.apache.iotdb.spark.db._

val wide_df = spark.read.format("org.apache.iotdb.spark.db").option("url", "jdbc:iotdb://127.0.0.1:6667/").option("sql", "select * from root where time < 1100 and time > 1000").load
val narrow_df = Transformer.toNarrowForm(spark, wide_df)
  • 从窄到宽
import org.apache.iotdb.spark.db._

val wide_df = Transformer.toWideForm(spark, narrow_df)

Java用户指南

import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SparkSession;
import org.apache.iotdb.spark.db.*;

public class Example {

  public static void main(String[] args) {
    SparkSession spark = SparkSession
        .builder()
        .appName("Build a DataFrame from Scratch")
        .master("local[*]")
        .getOrCreate();

    Dataset<Row> df = spark.read().format("org.apache.iotdb.spark.db")
        .option("url","jdbc:iotdb://127.0.0.1:6667/")
        .option("sql","select * from root").load();

    df.printSchema();

    df.show();
    
    Dataset<Row> narrowTable = Transformer.toNarrowForm(spark, df);
    narrowTable.show();
  }
}

写数据到IoTDB

用户指南

// import narrow table
val df = spark.createDataFrame(List(
      (1L, "root.test.d0",1, 1L, 1.0F, 1.0D, true, "hello"),
      (2L, "root.test.d0", 2, 2L, 2.0F, 2.0D, false, "world")))

val dfWithColumn = df.withColumnRenamed("_1", "Time")
    .withColumnRenamed("_2", "device_name")
    .withColumnRenamed("_3", "s0")
    .withColumnRenamed("_4", "s1")
    .withColumnRenamed("_5", "s2")
    .withColumnRenamed("_6", "s3")
    .withColumnRenamed("_7", "s4")
    .withColumnRenamed("_8", "s5")
dfWithColumn
    .write
    .format("org.apache.iotdb.spark.db")
    .option("url", "jdbc:iotdb://127.0.0.1:6667/")
    .save
    
// import wide table
val df = spark.createDataFrame(List(
      (1L, 1, 1L, 1.0F, 1.0D, true, "hello"),
      (2L, 2, 2L, 2.0F, 2.0D, false, "world")))

val dfWithColumn = df.withColumnRenamed("_1", "Time")
    .withColumnRenamed("_2", "root.test.d0.s0")
    .withColumnRenamed("_3", "root.test.d0.s1")
    .withColumnRenamed("_4", "root.test.d0.s2")
    .withColumnRenamed("_5", "root.test.d0.s3")
    .withColumnRenamed("_6", "root.test.d0.s4")
    .withColumnRenamed("_7", "root.test.d0.s5")
dfWithColumn.write.format("org.apache.iotdb.spark.db")
    .option("url", "jdbc:iotdb://127.0.0.1:6667/")
	.option("numPartition", "10")
    .save

注意

  1. 无论dataframe中存放的是窄表还是宽表,都可以直接将数据写到IoTDB中。
  2. numPartition参数是用来设置分区数,会在写入数据之前给dataframe进行重分区。每一个分区都会开启一个session进行数据的写入,来提高并发数。

Copyright © 2024 The Apache Software Foundation.
Apache IoTDB, IoTDB, Apache, the Apache feather logo, and the Apache IoTDB project logo are either registered trademarks or trademarks of The Apache Software Foundation in all countries

Have a question? Connect with us on QQ, WeChat, or Slack. Join the community now.