Skip to main content

Spark-IoTDB User Guide

About 3 min

Spark-IoTDB User Guide

Supported Versions

Supported versions of Spark and Scala are as follows:

Spark VersionScala Version
2.4.0-latest2.11, 2.12

Precautions

  1. The current version of spark-iotdb-connector supports Scala 2.11 and 2.12, but not 2.13.
  2. spark-iotdb-connector supports usage in Spark for both Java, Scala, and PySpark.

Deployment

spark-iotdb-connector has two use cases: IDE development and spark-shell debugging.

IDE Development

For IDE development, simply add the following dependency to the pom.xml file:

    <dependency>
      <groupId>org.apache.iotdb</groupId>
      <!-- spark-iotdb-connector_2.11 or spark-iotdb-connector_2.13 -->
      <artifactId>spark-iotdb-connector_2.12.10</artifactId>
      <version>${iotdb.version}</version>
    </dependency>

spark-shell Debugging

To use spark-iotdb-connector in spark-shell, you need to download the with-dependencies version of the jar package
from the official website. After that, copy the jar package to the ${SPARK_HOME}/jars directory.
Simply execute the following command:

cp spark-iotdb-connector_2.12.10-${iotdb.version}.jar $SPARK_HOME/jars/

Usage

Parameters

ParameterDescriptionDefault ValueScopeCan be Empty
urlSpecifies the JDBC URL of IoTDBnullread, writefalse
userThe username of IoTDBrootread, writetrue
passwordThe password of IoTDBrootread, writetrue
sqlSpecifies the SQL statement for queryingnullreadtrue
numPartitionSpecifies the partition number of the DataFrame when in read, and the write concurrency number when in write1read, writetrue
lowerBoundThe start timestamp of the query (inclusive)0readtrue
upperBoundThe end timestamp of the query (inclusive)0readtrue

Reading Data from IoTDB

Here is an example that demonstrates how to read data from IoTDB into a DataFrame:

import org.apache.iotdb.spark.db._

val df = spark.read.format("org.apache.iotdb.spark.db")
  .option("user", "root")
  .option("password", "root")
  .option("url", "jdbc:iotdb://127.0.0.1:6667/")
  .option("sql", "select ** from root") // query SQL
  .option("lowerBound", "0") // lower timestamp bound
  .option("upperBound", "100000000") // upper timestamp bound
  .option("numPartition", "5") // number of partitions
  .load

df.printSchema()

df.show()

Writing Data to IoTDB

Here is an example that demonstrates how to write data to IoTDB:

// Construct narrow table data
val df = spark.createDataFrame(List(
  (1L, "root.test.d0", 1, 1L, 1.0F, 1.0D, true, "hello"),
  (2L, "root.test.d0", 2, 2L, 2.0F, 2.0D, false, "world")))

val dfWithColumn = df.withColumnRenamed("_1", "Time")
  .withColumnRenamed("_2", "Device")
  .withColumnRenamed("_3", "s0")
  .withColumnRenamed("_4", "s1")
  .withColumnRenamed("_5", "s2")
  .withColumnRenamed("_6", "s3")
  .withColumnRenamed("_7", "s4")
  .withColumnRenamed("_8", "s5")

// Write narrow table data
dfWithColumn
  .write
  .format("org.apache.iotdb.spark.db")
  .option("url", "jdbc:iotdb://127.0.0.1:6667/")
  .save

// Construct wide table data
val df = spark.createDataFrame(List(
  (1L, 1, 1L, 1.0F, 1.0D, true, "hello"),
  (2L, 2, 2L, 2.0F, 2.0D, false, "world")))

val dfWithColumn = df.withColumnRenamed("_1", "Time")
  .withColumnRenamed("_2", "root.test.d0.s0")
  .withColumnRenamed("_3", "root.test.d0.s1")
  .withColumnRenamed("_4", "root.test.d0.s2")
  .withColumnRenamed("_5", "root.test.d0.s3")
  .withColumnRenamed("_6", "root.test.d0.s4")
  .withColumnRenamed("_7", "root.test.d0.s5")

// Write wide table data
dfWithColumn.write.format("org.apache.iotdb.spark.db")
  .option("url", "jdbc:iotdb://127.0.0.1:6667/")
  .option("numPartition", "10")
  .save

Wide and Narrow Table Conversion

Here are examples of how to convert between wide and narrow tables:

  • From wide to narrow
import org.apache.iotdb.spark.db._

val wide_df = spark.read.format("org.apache.iotdb.spark.db").option("url", "jdbc:iotdb://127.0.0.1:6667/").option("sql", "select * from root.** where time < 1100 and time > 1000").load
val narrow_df = Transformer.toNarrowForm(spark, wide_df)
  • From narrow to wide
import org.apache.iotdb.spark.db._

val wide_df = Transformer.toWideForm(spark, narrow_df)

Wide and Narrow Tables

Using the TsFile structure as an example: there are three measurements in the TsFile pattern,
namely Status, Temperature, and Hardware. The basic information for each of these three measurements is as
follows:

NameTypeEncoding
StatusBooleanPLAIN
TemperatureFloatRLE
HardwareTextPLAIN

The existing data in the TsFile is as follows:

  • d1:root.ln.wf01.wt01
  • d2:root.ln.wf02.wt02
timed1.statustimed1.temperaturetimed2.hardwaretimed2.status
1True12.22"aaa"1True
3True22.24"bbb"2False
5False32.16"ccc"4True

The wide (default) table form is as follows:

Timeroot.ln.wf02.wt02.temperatureroot.ln.wf02.wt02.statusroot.ln.wf02.wt02.hardwareroot.ln.wf01.wt01.temperatureroot.ln.wf01.wt01.statusroot.ln.wf01.wt01.hardware
1nulltruenull2.2truenull
2nullfalseaaa2.2nullnull
3nullnullnull2.1truenull
4nulltruebbbnullnullnull
5nullnullnullnullfalsenull
6nullnullcccnullnullnull

You can also use the narrow table format as shown below:

TimeDevicestatushardwaretemperature
1root.ln.wf02.wt01truenull2.2
1root.ln.wf02.wt02truenullnull
2root.ln.wf02.wt01nullnull2.2
2root.ln.wf02.wt02falseaaanull
3root.ln.wf02.wt01truenull2.1
4root.ln.wf02.wt02truebbbnull
5root.ln.wf02.wt01falsenullnull
6root.ln.wf02.wt02nullcccnull

Copyright © 2024 The Apache Software Foundation.
Apache and the Apache feather logo are trademarks of The Apache Software Foundation

Have a question? Connect with us on QQ, WeChat, or Slack. Join the community now.

We use Google Analytics to collect anonymous, aggregated usage information.