# Spark-IoTDB

# version

The versions required for Spark and Java are as follow:

Spark Version Scala Version Java Version TsFile
2.4.5 2.12 1.8 0.12.0

Currently we only support spark version 2.4.3 and there are some known issue on 2.4.7, do no use it

# Install

mvn clean scala:compile compile install

# Maven Dependency

    <dependency>
      <groupId>org.apache.iotdb</groupId>
      <artifactId>spark-iotdb-connector</artifactId>
      <version>0.12.5</version>
    </dependency>
1
2
3
4
5

# spark-shell user guide

spark-shell --jars spark-iotdb-connector-0.12.0.jar,iotdb-jdbc-0.12.0-jar-with-dependencies.jar

import org.apache.iotdb.spark.db._

val df = spark.read.format("org.apache.iotdb.spark.db").option("url","jdbc:iotdb://127.0.0.1:6667/").option("sql","select * from root").load

df.printSchema()

df.show()
1
2
3
4
5
6
7
8
9

To partition rdd:

spark-shell --jars spark-iotdb-connector-0.12.0.jar,iotdb-jdbc-0.12.0-jar-with-dependencies.jar

import org.apache.iotdb.spark.db._

val df = spark.read.format("org.apache.iotdb.spark.db").option("url","jdbc:iotdb://127.0.0.1:6667/").option("sql","select * from root").
                        option("lowerBound", [lower bound of time that you want query(include)]).option("upperBound", [upper bound of time that you want query(include)]).
                        option("numPartition", [the partition number you want]).load

df.printSchema()

df.show()
1
2
3
4
5
6
7
8
9
10
11

# Schema Inference

Take the following TsFile structure as an example: There are three Measurements in the TsFile schema: status, temperature, and hardware. The basic information of these three measurements is as follows:

Name Type Encode
status Boolean PLAIN
temperature Float RLE
hardware Text PLAIN

The existing data in the TsFile is as follows:

SI

The wide(default) table form is as follows:

time root.ln.wf02.wt02.temperature root.ln.wf02.wt02.status root.ln.wf02.wt02.hardware root.ln.wf01.wt01.temperature root.ln.wf01.wt01.status root.ln.wf01.wt01.hardware
1 null true null 2.2 true null
2 null false aaa 2.2 null null
3 null null null 2.1 true null
4 null true bbb null null null
5 null null null null false null
6 null null ccc null null null

You can also use narrow table form which as follows: (You can see part 4 about how to use narrow form)

time device_name status hardware temperature
1 root.ln.wf02.wt01 true null 2.2
1 root.ln.wf02.wt02 true null null
2 root.ln.wf02.wt01 null null 2.2
2 root.ln.wf02.wt02 false aaa null
3 root.ln.wf02.wt01 true null 2.1
4 root.ln.wf02.wt02 true bbb null
5 root.ln.wf02.wt01 false null null
6 root.ln.wf02.wt02 null ccc null

# Transform between wide and narrow table

  • from wide to narrow
import org.apache.iotdb.spark.db._

val wide_df = spark.read.format("org.apache.iotdb.spark.db").option("url", "jdbc:iotdb://127.0.0.1:6667/").option("sql", "select * from root where time < 1100 and time > 1000").load
val narrow_df = Transformer.toNarrowForm(spark, wide_df)
1
2
3
4
  • from narrow to wide
import org.apache.iotdb.spark.db._

val wide_df = Transformer.toWideForm(spark, narrow_df)
1
2
3

# Java user guide

import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SparkSession;
import org.apache.iotdb.spark.db.*

public class Example {

  public static void main(String[] args) {
    SparkSession spark = SparkSession
        .builder()
        .appName("Build a DataFrame from Scratch")
        .master("local[*]")
        .getOrCreate();

    Dataset<Row> df = spark.read().format("org.apache.iotdb.spark.db")
        .option("url","jdbc:iotdb://127.0.0.1:6667/")
        .option("sql","select * from root").load();

    df.printSchema();

    df.show();
    
    Dataset<Row> narrowTable = Transformer.toNarrowForm(spark, df)
    narrowTable.show()
  }
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26

# Write Data to IoTDB

# User Guide

// import narrow table
val df = spark.createDataFrame(List(
      (1L, "root.test.d0",1, 1L, 1.0F, 1.0D, true, "hello"),
      (2L, "root.test.d0", 2, 2L, 2.0F, 2.0D, false, "world")))

val dfWithColumn = df.withColumnRenamed("_1", "Time")
    .withColumnRenamed("_2", "device_name")
    .withColumnRenamed("_3", "s0")
    .withColumnRenamed("_4", "s1")
    .withColumnRenamed("_5", "s2")
    .withColumnRenamed("_6", "s3")
    .withColumnRenamed("_7", "s4")
    .withColumnRenamed("_8", "s5")
dfWithColumn
    .write
    .format("org.apache.iotdb.spark.db")
    .option("url", "jdbc:iotdb://127.0.0.1:6667/")
    .save
    
// import wide table
val df = spark.createDataFrame(List(
      (1L, 1, 1L, 1.0F, 1.0D, true, "hello"),
      (2L, 2, 2L, 2.0F, 2.0D, false, "world")))

val dfWithColumn = df.withColumnRenamed("_1", "Time")
    .withColumnRenamed("_2", "root.test.d0.s0")
    .withColumnRenamed("_3", "root.test.d0.s1")
    .withColumnRenamed("_4", "root.test.d0.s2")
    .withColumnRenamed("_5", "root.test.d0.s3")
    .withColumnRenamed("_6", "root.test.d0.s4")
    .withColumnRenamed("_7", "root.test.d0.s5")
dfWithColumn.write.format("org.apache.iotdb.spark.db")
    .option("url", "jdbc:iotdb://127.0.0.1:6667/")
    .option("numPartition", "10")
    .save
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35

# Notes

  1. You can directly write data to IoTDB whatever the dataframe contains a wide table or a narrow table.
  2. The parameter numPartition is used to set the number of partitions. The dataframe that you want to save will be repartition based on this parameter before writing data. Each partition will open a session to write data to increase the number of concurrent requests.

Copyright © 2022 The Apache Software Foundation.
Apache and the Apache feather logo are trademarks of The Apache Software Foundation

Have a question? Connect with us on QQ, WeChat, or Slack. Join the community now.

We use Google Analytics to collect anonymous, aggregated usage information.