Apache Spark
Apache Spark
Supported Versions
Supported versions of Spark and Scala are as follows:
| Spark Version | Scala Version | 
|---|---|
2.4.0-latest | 2.11, 2.12 | 
Precautions
- The current version of 
spark-iotdb-connectorsupports Scala2.11and2.12, but not2.13. spark-iotdb-connectorsupports usage in Spark for both Java, Scala, and PySpark.
Deployment
spark-iotdb-connector has two use cases: IDE development and spark-shell debugging.
IDE Development
For IDE development, simply add the following dependency to the pom.xml file:
    <dependency>
      <groupId>org.apache.iotdb</groupId>
      <!-- spark-iotdb-connector_2.11 or spark-iotdb-connector_2.13 -->
      <artifactId>spark-iotdb-connector_2.12.10</artifactId>
      <version>${iotdb.version}</version>
    </dependency>spark-shell Debugging
To use spark-iotdb-connector in spark-shell, you need to download the with-dependencies version of the jar package
 from the official website. After that, copy the jar package to the ${SPARK_HOME}/jars directory.
 Simply execute the following command:
cp spark-iotdb-connector_2.12.10-${iotdb.version}.jar $SPARK_HOME/jars/In addition, to ensure that spark can use JDBC and IoTDB connections, you need to do the following:
Run the following command to compile the IoTDB JDBC connector:
mvn clean package -pl iotdb-client/jdbc -am -DskipTests -P get-jar-with-dependenciesThe compiled jar package is located in the following directory:
$IoTDB_HOME/iotdb-client/jdbc/target/iotdb-jdbc-{version}-SNAPSHOT-jar-with-dependencies.jarAt last, copy the jar package to the ${SPARK_HOME}/jars directory. Simply execute the following command:
cp iotdb-jdbc-{version}-SNAPSHOT-jar-with-dependencies.jar $SPARK_HOME/jars/Usage
Parameters
| Parameter | Description | Default Value | Scope | Can be Empty | 
|---|---|---|---|---|
| url | Specifies the JDBC URL of IoTDB | null | read, write | false | 
| user | The username of IoTDB | root | read, write | true | 
| password | The password of IoTDB | root | read, write | true | 
| sql | Specifies the SQL statement for querying | null | read | true | 
| numPartition | Specifies the partition number of the DataFrame when in read, and the write concurrency number when in write | 1 | read, write | true | 
| lowerBound | The start timestamp of the query (inclusive) | 0 | read | true | 
| upperBound | The end timestamp of the query (inclusive) | 0 | read | true | 
Reading Data from IoTDB
Here is an example that demonstrates how to read data from IoTDB into a DataFrame:
import org.apache.iotdb.spark.db._
val df = spark.read.format("org.apache.iotdb.spark.db")
  .option("user", "root")
  .option("password", "root")
  .option("url", "jdbc:iotdb://127.0.0.1:6667/")
  .option("sql", "select ** from root") // query SQL
  .option("lowerBound", "0") // lower timestamp bound
  .option("upperBound", "100000000") // upper timestamp bound
  .option("numPartition", "5") // number of partitions
  .load
df.printSchema()
df.show()Writing Data to IoTDB
Here is an example that demonstrates how to write data to IoTDB:
// Construct narrow table data
val df = spark.createDataFrame(List(
  (1L, "root.test.d0", 1, 1L, 1.0F, 1.0D, true, "hello"),
  (2L, "root.test.d0", 2, 2L, 2.0F, 2.0D, false, "world")))
val dfWithColumn = df.withColumnRenamed("_1", "Time")
  .withColumnRenamed("_2", "Device")
  .withColumnRenamed("_3", "s0")
  .withColumnRenamed("_4", "s1")
  .withColumnRenamed("_5", "s2")
  .withColumnRenamed("_6", "s3")
  .withColumnRenamed("_7", "s4")
  .withColumnRenamed("_8", "s5")
// Write narrow table data
dfWithColumn
  .write
  .format("org.apache.iotdb.spark.db")
  .option("url", "jdbc:iotdb://127.0.0.1:6667/")
  .save
// Construct wide table data
val df = spark.createDataFrame(List(
  (1L, 1, 1L, 1.0F, 1.0D, true, "hello"),
  (2L, 2, 2L, 2.0F, 2.0D, false, "world")))
val dfWithColumn = df.withColumnRenamed("_1", "Time")
  .withColumnRenamed("_2", "root.test.d0.s0")
  .withColumnRenamed("_3", "root.test.d0.s1")
  .withColumnRenamed("_4", "root.test.d0.s2")
  .withColumnRenamed("_5", "root.test.d0.s3")
  .withColumnRenamed("_6", "root.test.d0.s4")
  .withColumnRenamed("_7", "root.test.d0.s5")
// Write wide table data
dfWithColumn.write.format("org.apache.iotdb.spark.db")
  .option("url", "jdbc:iotdb://127.0.0.1:6667/")
  .option("numPartition", "10")
  .saveWide and Narrow Table Conversion
Here are examples of how to convert between wide and narrow tables:
- From wide to narrow
 
import org.apache.iotdb.spark.db._
val wide_df = spark.read.format("org.apache.iotdb.spark.db").option("url", "jdbc:iotdb://127.0.0.1:6667/").option("sql", "select * from root.** where time < 1100 and time > 1000").load
val narrow_df = Transformer.toNarrowForm(spark, wide_df)- From narrow to wide
 
import org.apache.iotdb.spark.db._
val wide_df = Transformer.toWideForm(spark, narrow_df)Wide and Narrow Tables
Using the TsFile structure as an example: there are three measurements in the TsFile pattern,
 namely Status, Temperature, and Hardware. The basic information for each of these three measurements is as
 follows:
| Name | Type | Encoding | 
|---|---|---|
| Status | Boolean | PLAIN | 
| Temperature | Float | RLE | 
| Hardware | Text | PLAIN | 
The existing data in the TsFile is as follows:
d1:root.ln.wf01.wt01d2:root.ln.wf02.wt02
| time | d1.status | time | d1.temperature | time | d2.hardware | time | d2.status | 
|---|---|---|---|---|---|---|---|
| 1 | True | 1 | 2.2 | 2 | "aaa" | 1 | True | 
| 3 | True | 2 | 2.2 | 4 | "bbb" | 2 | False | 
| 5 | False | 3 | 2.1 | 6 | "ccc" | 4 | True | 
The wide (default) table form is as follows:
| Time | root.ln.wf02.wt02.temperature | root.ln.wf02.wt02.status | root.ln.wf02.wt02.hardware | root.ln.wf01.wt01.temperature | root.ln.wf01.wt01.status | root.ln.wf01.wt01.hardware | 
|---|---|---|---|---|---|---|
| 1 | null | true | null | 2.2 | true | null | 
| 2 | null | false | aaa | 2.2 | null | null | 
| 3 | null | null | null | 2.1 | true | null | 
| 4 | null | true | bbb | null | null | null | 
| 5 | null | null | null | null | false | null | 
| 6 | null | null | ccc | null | null | null | 
You can also use the narrow table format as shown below:
| Time | Device | status | hardware | temperature | 
|---|---|---|---|---|
| 1 | root.ln.wf02.wt01 | true | null | 2.2 | 
| 1 | root.ln.wf02.wt02 | true | null | null | 
| 2 | root.ln.wf02.wt01 | null | null | 2.2 | 
| 2 | root.ln.wf02.wt02 | false | aaa | null | 
| 3 | root.ln.wf02.wt01 | true | null | 2.1 | 
| 4 | root.ln.wf02.wt02 | true | bbb | null | 
| 5 | root.ln.wf02.wt01 | false | null | null | 
| 6 | root.ln.wf02.wt02 | null | ccc | null | 
