2023年7月10日大约 3 分钟
Spark-IoTDB
版本
Spark和Java所需的版本如下:
Spark Version | Scala Version | Java Version | TsFile |
---|---|---|---|
2.4.0-3.2.0 | 2.12 | 1.8 | 0.13.0 |
注意
Spark IoTDB Connector只支持
Spark 2.4.0
到Spark 3.2.0
的Scala 2.12
版本。
如果需要对其他版本进行支持,可以通过修改源码中spark-iotdb-connector
这个模块里面pom文件的Scala版本之后进行重新编译。因为IoTDB与Spark的thrift版本有冲突,所以需要通过执行
rm -f $SPARK_HOME/jars/libthrift*
和cp $IOTDB_HOME/lib/libthrift* $SPARK_HOME/jars/
这两个命令来解决。
否则的话,就只能在IDE里面进行代码调试。而且如果你需要通过spark-submit
命令提交任务的话,你打包时必须要带上依赖。
安装
mvn clean scala:compile compile install
Maven依赖
<dependency>
<groupId>org.apache.iotdb</groupId>
<artifactId>spark-iotdb-connector</artifactId>
<version>0.13.0</version>
</dependency>
Spark-shell用户指南
spark-shell --jars spark-iotdb-connector-0.13.0.jar,iotdb-jdbc-0.13.0-jar-with-dependencies.jar
import org.apache.iotdb.spark.db._
val df = spark.read.format("org.apache.iotdb.spark.db").option("url","jdbc:iotdb://127.0.0.1:6667/").option("sql","select * from root").load
df.printSchema()
df.show()
如果要对rdd进行分区,可以执行以下操作
spark-shell --jars spark-iotdb-connector-0.13.0.jar,iotdb-jdbc-0.13.0-jar-with-dependencies.jar
import org.apache.iotdb.spark.db._
val df = spark.read.format("org.apache.iotdb.spark.db").option("url","jdbc:iotdb://127.0.0.1:6667/").option("sql","select * from root").
option("lowerBound", [lower bound of time that you want query(include)]).option("upperBound", [upper bound of time that you want query(include)]).
option("numPartition", [the partition number you want]).load
df.printSchema()
df.show()
模式推断
以下TsFile结构为例:TsFile模式中有三个度量:状态,温度和硬件。 这三种测量的基本信息如下:
名称 | 类型 | 编码 |
---|---|---|
状态 | Boolean | PLAIN |
温度 | Float | RLE |
硬件 | Text | PLAIN |
TsFile中的现有数据如下:
- d1:root.ln.wf01.wt01
- d2:root.ln.wf02.wt02
time | d1.status | time | d1.temperature | time | d2.hardware | time | d2.status |
---|---|---|---|---|---|---|---|
1 | True | 1 | 2.2 | 2 | "aaa" | 1 | True |
3 | True | 2 | 2.2 | 4 | "bbb" | 2 | False |
5 | False | 3 | 2.1 | 6 | "ccc" | 4 | True |
宽(默认)表形式如下:
time | root.ln.wf02.wt02.temperature | root.ln.wf02.wt02.status | root.ln.wf02.wt02.hardware | root.ln.wf01.wt01.temperature | root.ln.wf01.wt01.status | root.ln.wf01.wt01.hardware |
---|---|---|---|---|---|---|
1 | null | true | null | 2.2 | true | null |
2 | null | false | aaa | 2.2 | null | null |
3 | null | null | null | 2.1 | true | null |
4 | null | true | bbb | null | null | null |
5 | null | null | null | null | false | null |
6 | null | null | ccc | null | null | null |
你还可以使用窄表形式,如下所示:(您可以参阅第4部分,了解如何使用窄表形式)
时间 | 设备名 | 状态 | 硬件 | 温度 |
---|---|---|---|---|
1 | root.ln.wf02.wt01 | true | null | 2.2 |
1 | root.ln.wf02.wt02 | true | null | null |
2 | root.ln.wf02.wt01 | null | null | 2.2 |
2 | root.ln.wf02.wt02 | false | aaa | null |
3 | root.ln.wf02.wt01 | true | null | 2.1 |
4 | root.ln.wf02.wt02 | true | bbb | null |
5 | root.ln.wf02.wt01 | false | null | null |
6 | root.ln.wf02.wt02 | null | ccc | null |
在宽和窄表之间转换
- 从宽到窄
import org.apache.iotdb.spark.db._
val wide_df = spark.read.format("org.apache.iotdb.spark.db").option("url", "jdbc:iotdb://127.0.0.1:6667/").option("sql", "select * from root.** where time < 1100 and time > 1000").load
val narrow_df = Transformer.toNarrowForm(spark, wide_df)
- 从窄到宽
import org.apache.iotdb.spark.db._
val wide_df = Transformer.toWideForm(spark, narrow_df)
Java用户指南
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SparkSession;
import org.apache.iotdb.spark.db.*;
public class Example {
public static void main(String[] args) {
SparkSession spark = SparkSession
.builder()
.appName("Build a DataFrame from Scratch")
.master("local[*]")
.getOrCreate();
Dataset<Row> df = spark.read().format("org.apache.iotdb.spark.db")
.option("url","jdbc:iotdb://127.0.0.1:6667/")
.option("sql","select * from root").load();
df.printSchema();
df.show();
Dataset<Row> narrowTable = Transformer.toNarrowForm(spark, df);
narrowTable.show();
}
}
写数据到IoTDB
用户指南
// import narrow table
val df = spark.createDataFrame(List(
(1L, "root.test.d0",1, 1L, 1.0F, 1.0D, true, "hello"),
(2L, "root.test.d0", 2, 2L, 2.0F, 2.0D, false, "world")))
val dfWithColumn = df.withColumnRenamed("_1", "Time")
.withColumnRenamed("_2", "device_name")
.withColumnRenamed("_3", "s0")
.withColumnRenamed("_4", "s1")
.withColumnRenamed("_5", "s2")
.withColumnRenamed("_6", "s3")
.withColumnRenamed("_7", "s4")
.withColumnRenamed("_8", "s5")
dfWithColumn
.write
.format("org.apache.iotdb.spark.db")
.option("url", "jdbc:iotdb://127.0.0.1:6667/")
.save
// import wide table
val df = spark.createDataFrame(List(
(1L, 1, 1L, 1.0F, 1.0D, true, "hello"),
(2L, 2, 2L, 2.0F, 2.0D, false, "world")))
val dfWithColumn = df.withColumnRenamed("_1", "Time")
.withColumnRenamed("_2", "root.test.d0.s0")
.withColumnRenamed("_3", "root.test.d0.s1")
.withColumnRenamed("_4", "root.test.d0.s2")
.withColumnRenamed("_5", "root.test.d0.s3")
.withColumnRenamed("_6", "root.test.d0.s4")
.withColumnRenamed("_7", "root.test.d0.s5")
dfWithColumn.write.format("org.apache.iotdb.spark.db")
.option("url", "jdbc:iotdb://127.0.0.1:6667/")
.option("numPartition", "10")
.save
注意
- 无论dataframe中存放的是窄表还是宽表,都可以直接将数据写到IoTDB中。
- numPartition参数是用来设置分区数,会在写入数据之前给dataframe进行重分区。每一个分区都会开启一个session进行数据的写入,来提高并发数。