Skip to main content

flink-sql-iotdb-connector

...About 8 min

flink-sql-iotdb-connector

The flink-sql-iotdb-connector seamlessly connects Flink SQL or Flink Table with IoTDB, enabling real-time read and write operations on IoTDB within Flink tasks. It can be applied to the following scenarios:

  1. Real-time data synchronization: Real-time synchronization of data from one database to another.
  2. Real-time data pipeline: Building real-time data processing pipelines to process and analyze data in databases.
  3. Real-time data analysis: Real-time analysis of data in databases, providing real-time business insights.
  4. Real-time applications: Real-time application of database data in real-time applications such as real-time reporting and real-time recommendations.
  5. Real-time monitoring: Real-time monitoring of database data, detecting anomalies and errors.

Read and Write Modes

Read Modes (Source)Write Modes (Sink)
Bounded Scan, Lookup, CDCStreaming Sink, Batch Sink

Read Modes (Source)

  • Bounded Scan: Bounded scan is primarily implemented by specifying the time series and optional upper and lower bounds of the query conditions to query data, and the query result usually consists of multiple rows of data. This type of query cannot retrieve data that is updated after the query.

  • Lookup: The lookup query mode differs from the scan query mode. While bounded scan queries data within a time range, the lookup query mode only queries data at a precise time point, resulting in a single row of data. Additionally, only the right table of a lookup join can use the lookup query mode.

  • CDC: CDC is mainly used in Flink's ETL tasks. When data in IoTDB changes, Flink can detect it through our provided CDC connector, and we can forward the detected change data to other external data sources to achieve the purpose of ETL.

Write Modes (Sink)

  • Streaming Sink: Used in Flink's streaming mode, it synchronizes the insert, update, and delete records of the Dynamic Table in Flink to IoTDB in real-time.

  • Batch Sink: Used in Flink's batch mode, it writes the batch computation results from Flink to IoTDB in a single operation.

Usage

We provide two ways to use the flink-sql-iotdb-connector. One is to reference it through Maven during project development, and the other is to use it in Flink's sql-client. We will introduce these two usage methods separately.

📌 Note: flink version requires 1.17.0 and above.

Maven

Simply add the following dependency to your project's pom file:

<dependency>
    <groupId>org.apache.iotdb</groupId>
    <artifactId>flink-sql-iotdb-connector</artifactId>
    <version>${iotdb.version}</version>
</dependency>

sql-client

If you want to use the flink-sql-iotdb-connector in the sql-client, follow these steps to configure the environment:

  1. Download the flink-sql-iotdb-connector jar file with dependencies from the official websiteopen in new window.

  2. Copy the jar file to the $FLINK_HOME/lib directory.

  3. Start the Flink cluster.

  4. Start the sql-client.

You can now use the flink-sql-iotdb-connector in the sql-client.

Table Structure Specification

Regardless of the type of connector used, the following table structure specifications must be met:

  • For all tables using the IoTDB connector, the first column must be named Time_ and have a data type of BIGINT.
  • All column names, except for the Time_ column, must start with root.. Additionally, any node in the column name cannot be purely numeric. If there are purely numeric or other illegal characters in the column name, they must be enclosed in backticks. For example, the path root.sg.d0.123 is an illegal path, but root.sg.d0.123`` is a valid path.
  • When querying data from IoTDB using either pattern or sql, the time series names in the query result must include all column names in Flink, except for Time_. If there is no corresponding column name in the query result, that column will be filled with null.
  • The supported data types in flink-sql-iotdb-connector are: INT, BIGINT, FLOAT, DOUBLE, BOOLEAN, STRING. The data type of each column in Flink Table must match the corresponding time series type in IoTDB, otherwise an error will occur and the Flink task will exit.

The following examples illustrate the mapping between time series in IoTDB and columns in Flink Table.

Read Mode (Source)

Scan Table (Bounded)

Parameters

ParameterRequiredDefaultTypeDescription
nodeUrlsNo127.0.0.1:6667StringSpecifies the datanode addresses of IoTDB. If IoTDB is deployed in cluster mode, multiple addresses can be specified, separated by commas.
userNorootStringIoTDB username
passwordNorootStringIoTDB password
scan.bounded.lower-boundNo-1LLongLower bound (inclusive) of the timestamp for bounded scan queries. Valid when the parameter is greater than 0.
scan.bounded.upper-boundNo-1LLongUpper bound (inclusive) of the timestamp for bounded scan queries. Valid when the parameter is greater than 0.
sqlYesNoneStringQuery to be executed in IoTDB.

Example

This example demonstrates how to read data from IoTDB using the scan table method in a Flink Table Job:

Assume the data in IoTDB is as follows:

IoTDB> select ** from root;
+-----------------------------+-------------+-------------+-------------+
|                         Time|root.sg.d0.s0|root.sg.d1.s0|root.sg.d1.s1|
+-----------------------------+-------------+-------------+-------------+
|1970-01-01T08:00:00.001+08:00|    1.0833644|      2.34874|    1.2414109|
|1970-01-01T08:00:00.002+08:00|     4.929185|    3.1885583|    4.6980085|
|1970-01-01T08:00:00.003+08:00|    3.5206156|    3.5600138|    4.8080945|
|1970-01-01T08:00:00.004+08:00|    1.3449302|    2.8781595|    3.3195343|
|1970-01-01T08:00:00.005+08:00|    3.3079383|    3.3840187|    3.7278645|
+-----------------------------+-------------+-------------+-------------+
Total line number = 5
It costs 0.028s
import org.apache.flink.table.api.*;

public class BoundedScanTest {
  public static void main(String[] args) throws Exception {
    // setup table environment
    EnvironmentSettings settings = EnvironmentSettings
            .newInstance()
            .inStreamingMode()
            .build();
    TableEnvironment tableEnv = TableEnvironment.create(settings);
    // setup schema
    Schema iotdbTableSchema =
            Schema.newBuilder()
                    .column("Time_", DataTypes.BIGINT())
                    .column("root.sg.d0.s0", DataTypes.FLOAT())
                    .column("root.sg.d1.s0", DataTypes.FLOAT())
                    .column("root.sg.d1.s1", DataTypes.FLOAT())
                    .build();
    // register table
    TableDescriptor iotdbDescriptor =
            TableDescriptor.forConnector("IoTDB")
                    .schema(iotdbTableSchema)
                    .option("nodeUrls", "127.0.0.1:6667")
                    .option("sql", "select ** from root")
                    .build();
    tableEnv.createTemporaryTable("iotdbTable", iotdbDescriptor);

    // output table
    tableEnv.from("iotdbTable").execute().print();
  }
}

After executing the above job, the output table in the Flink console is as follows:

+----+----------------------+--------------------------------+--------------------------------+--------------------------------+
| op |                Time_ |                  root.sg.d0.s0 |                  root.sg.d1.s0 |                  root.sg.d1.s1 |
+----+----------------------+--------------------------------+--------------------------------+--------------------------------+
| +I |                    1 |                      1.0833644 |                        2.34874 |                      1.2414109 |
| +I |                    2 |                       4.929185 |                      3.1885583 |                      4.6980085 |
| +I |                    3 |                      3.5206156 |                      3.5600138 |                      4.8080945 |
| +I |                    4 |                      1.3449302 |                      2.8781595 |                      3.3195343 |
| +I |                    5 |                      3.3079383 |                      3.3840187 |                      3.7278645 |
+----+----------------------+--------------------------------+--------------------------------+--------------------------------+

Lookup Point

Parameters

ParameterRequiredDefaultTypeDescription
nodeUrlsNo127.0.0.1:6667StringSpecifies the addresses of the IoTDB datanode. If IoTDB is deployed in cluster mode, multiple addresses can be specified, separated by commas.
userNorootStringIoTDB username
passwordNorootStringIoTDB password
lookup.cache.max-rowsNo-1IntegerMaximum number of rows to cache for lookup queries. Effective when the parameter is greater than 0.
lookup.cache.ttl-secNo-1IntegerTime-to-live for cached data in lookup queries, in seconds.
sqlYesNoneStringSQL query to execute in IoTDB.

Example

This example demonstrates how to perform a lookup query using the device table in IoTDB as a dimension table:

  • Use the datagen connector to generate two fields as the left table for Lookup Join. The first field is an incrementing field representing the timestamp. The second field is a random field representing a measurement time series.
  • Register a table using the IoTDB connector as the right table for Lookup Join.
  • Join the two tables together.

The current data in IoTDB is as follows:

IoTDB> select ** from root;
+-----------------------------+-------------+-------------+-------------+
|                         Time|root.sg.d0.s0|root.sg.d1.s0|root.sg.d1.s1|
+-----------------------------+-------------+-------------+-------------+
|1970-01-01T08:00:00.001+08:00|    1.0833644|      2.34874|    1.2414109|
|1970-01-01T08:00:00.002+08:00|     4.929185|    3.1885583|    4.6980085|
|1970-01-01T08:00:00.003+08:00|    3.5206156|    3.5600138|    4.8080945|
|1970-01-01T08:00:00.004+08:00|    1.3449302|    2.8781595|    3.3195343|
|1970-01-01T08:00:00.005+08:00|    3.3079383|    3.3840187|    3.7278645|
+-----------------------------+-------------+-------------+-------------+
Total line number = 5
It costs 0.028s
import org.apache.flink.table.api.DataTypes;
import org.apache.flink.table.api.EnvironmentSettings;
import org.apache.flink.table.api.Schema;
import org.apache.flink.table.api.TableDescriptor;
import org.apache.flink.table.api.TableEnvironment;

public class LookupTest {
  public static void main(String[] args) {
    // Setup environment
    EnvironmentSettings settings = EnvironmentSettings
            .newInstance()
            .inStreamingMode()
            .build();
    TableEnvironment tableEnv = TableEnvironment.create(settings);

    // Register left table
    Schema dataGenTableSchema =
            Schema.newBuilder()
                    .column("Time_", DataTypes.BIGINT())
                    .column("s0", DataTypes.INT())
                    .build();

    TableDescriptor datagenDescriptor =
            TableDescriptor.forConnector("datagen")
                    .schema(dataGenTableSchema)
                    .option("fields.Time_.kind", "sequence")
                    .option("fields.Time_.start", "1")
                    .option("fields.Time_.end", "5")
                    .option("fields.s0.min", "1")
                    .option("fields.s0.max", "1")
                    .build();
    tableEnv.createTemporaryTable("leftTable", datagenDescriptor);

    // Register right table
    Schema iotdbTableSchema =
            Schema.newBuilder()
                    .column("Time_", DataTypes.BIGINT())
                    .column("root.sg.d0.s0", DataTypes.FLOAT())
                    .column("root.sg.d1.s0", DataTypes.FLOAT())
                    .column("root.sg.d1.s1", DataTypes.FLOAT())
                    .build();

    TableDescriptor iotdbDescriptor =
            TableDescriptor.forConnector("IoTDB")
                    .schema(iotdbTableSchema)
                    .option("sql", "select ** from root")
                    .build();
    tableEnv.createTemporaryTable("rightTable", iotdbDescriptor);

    // Join
    String sql =
            "SELECT l.Time_, l.s0, r.`root.sg.d0.s0`, r.`root.sg.d1.s0`, r.`root.sg.d1.s1` "
                    + "FROM (SELECT *, PROCTIME() AS proc_time FROM leftTable) AS l "
                    + "JOIN rightTable FOR SYSTEM_TIME AS OF l.proc_time AS r "
                    + "ON l.Time_ = r.Time_";

    // Output table
    tableEnv.sqlQuery(sql).execute().print();
  }
}

After executing the above task, the output table in Flink's console is as follows:

+----+----------------------+-------------+---------------+----------------------+--------------------------------+
| op |                Time_ |          s0 | root.sg.d0.s0 |        root.sg.d1.s0 |                  root.sg.d1.s1 |
+----+----------------------+-------------+---------------+----------------------+--------------------------------+
| +I |                    5 |           1 |     3.3079383 |            3.3840187 |                      3.7278645 |
| +I |                    2 |           1 |      4.929185 |            3.1885583 |                      4.6980085 |
| +I |                    1 |           1 |     1.0833644 |              2.34874 |                      1.2414109 |
| +I |                    4 |           1 |     1.3449302 |            2.8781595 |                      3.3195343 |
| +I |                    3 |           1 |     3.5206156 |            3.5600138 |                      4.8080945 |
+----+----------------------+-------------+---------------+----------------------+--------------------------------+

CDC

Parameters

ParameterRequiredDefaultTypeDescription
nodeUrlsNo127.0.0.1:6667StringSpecifies the datanode address of IoTDB. If IoTDB is deployed in cluster mode, multiple addresses can be specified, separated by commas.
userNorootStringIoTDB username
passwordNorootStringIoTDB password
modeYesBOUNDEDENUMThis parameter must be set to CDC in order to start
sqlYesNoneStringSQL query to be executed in IoTDB
cdc.portNo8080IntegerPort number for the CDC service in IoTDB
cdc.task.nameopen in new windowYesNoneStringRequired when the mode parameter is set to CDC. Used to create a Pipe task in IoTDB.
cdc.patternYesNoneStringRequired when the mode parameter is set to CDC. Used as a filtering condition for sending data in IoTDB.

Example

This example demonstrates how to retrieve the changing data from a specific path in IoTDB using the CDC Connector:

  • Create a CDC table using the CDC Connector.
  • Print the CDC table.
import org.apache.flink.table.api.*;

public class CDCTest {
  public static void main(String[] args) {
    // setup environment
    EnvironmentSettings settings = EnvironmentSettings
            .newInstance()
            .inStreamingMode()
            .build();
    TableEnvironment tableEnv = TableEnvironment.create(settings);
    // setup schema
    Schema iotdbTableSchema = Schema
            .newBuilder()
            .column("Time_", DataTypes.BIGINT())
            .column("root.sg.d0.s0", DataTypes.FLOAT())
            .column("root.sg.d1.s0", DataTypes.FLOAT())
            .column("root.sg.d1.s1", DataTypes.FLOAT())
            .build();

    // register table
    TableDescriptor iotdbDescriptor = TableDescriptor
            .forConnector("IoTDB")
            .schema(iotdbTableSchema)
            .option("mode", "CDC")
            .option("cdc.task.name", "test")
            .option("cdc.pattern", "root.sg")
            .build();
    tableEnv.createTemporaryTable("iotdbTable", iotdbDescriptor);

    // output table
    tableEnv.from("iotdbTable").execute().print();
  }
}

Run the above Flink CDC task and execute the following SQL in IoTDB-cli:

insert into root.sg.d1(timestamp,s0,s1) values(6,1.0,1.0);
insert into root.sg.d1(timestamp,s0,s1) values(7,1.0,1.0);
insert into root.sg.d1(timestamp,s0,s1) values(6,2.0,1.0);
insert into root.sg.d0(timestamp,s0) values(7,2.0);

The console of Flink will print the following data:

+----+----------------------+--------------------------------+--------------------------------+--------------------------------+
| op |                Time_ |                  root.sg.d0.s0 |                  root.sg.d1.s0 |                  root.sg.d1.s1 |
+----+----------------------+--------------------------------+--------------------------------+--------------------------------+
| +I |                    7 |                         <NULL> |                            1.0 |                            1.0 |
| +I |                    6 |                         <NULL> |                            1.0 |                            1.0 |
| +I |                    6 |                         <NULL> |                            2.0 |                            1.0 |
| +I |                    7 |                            2.0 |                         <NULL> |                         <NULL> |

Write Mode (Sink)

Streaming Sink

Parameters

ParameterRequiredDefaultTypeDescription
nodeUrlsNo127.0.0.1:6667StringSpecifies the datanode address of IoTDB. If IoTDB is deployed in cluster mode, multiple addresses can be specified, separated by commas.
userNorootStringIoTDB username
passwordNorootStringIoTDB password
alignedNofalseBooleanWhether to call the aligned interface when writing data to IoTDB.

Example

This example demonstrates how to write data to IoTDB in a Flink Table Streaming Job:

  • Generate a source data table using the datagen connector.
  • Register an output table using the IoTDB connector.
  • Insert data from the source table into the output table.
import org.apache.flink.table.api.DataTypes;
import org.apache.flink.table.api.EnvironmentSettings;
import org.apache.flink.table.api.Schema;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.TableDescriptor;
import org.apache.flink.table.api.TableEnvironment;

public class StreamingSinkTest {
    public static void main(String[] args) {
        // setup environment
        EnvironmentSettings settings = EnvironmentSettings
                .newInstance()
                .inStreamingMode()
                .build();
        TableEnvironment tableEnv = TableEnvironment.create(settings);

        // create data source table
        Schema dataGenTableSchema = Schema
                .newBuilder()
                .column("Time_", DataTypes.BIGINT())
                .column("root.sg.d0.s0", DataTypes.FLOAT())
                .column("root.sg.d1.s0", DataTypes.FLOAT())
                .column("root.sg.d1.s1", DataTypes.FLOAT())
                .build();
        TableDescriptor descriptor = TableDescriptor
                .forConnector("datagen")
                .schema(dataGenTableSchema)
                .option("rows-per-second", "1")
                .option("fields.Time_.kind", "sequence")
                .option("fields.Time_.start", "1")
                .option("fields.Time_.end", "5")
                .option("fields.root.sg.d0.s0.min", "1")
                .option("fields.root.sg.d0.s0.max", "5")
                .option("fields.root.sg.d1.s0.min", "1")
                .option("fields.root.sg.d1.s0.max", "5")
                .option("fields.root.sg.d1.s1.min", "1")
                .option("fields.root.sg.d1.s1.max", "5")
                .build();
        // register source table
        tableEnv.createTemporaryTable("dataGenTable", descriptor);
        Table dataGenTable = tableEnv.from("dataGenTable");

        // create iotdb sink table
        TableDescriptor iotdbDescriptor = TableDescriptor
                .forConnector("IoTDB")
                .schema(dataGenTableSchema)
                .build();
        tableEnv.createTemporaryTable("iotdbSinkTable", iotdbDescriptor);

        // insert data
        dataGenTable.executeInsert("iotdbSinkTable").print();
    }
}

After the above job is executed, the query result in the IoTDB CLI is as follows:

IoTDB> select ** from root;
+-----------------------------+-------------+-------------+-------------+
|                         Time|root.sg.d0.s0|root.sg.d1.s0|root.sg.d1.s1|
+-----------------------------+-------------+-------------+-------------+
|1970-01-01T08:00:00.001+08:00|    1.0833644|      2.34874|    1.2414109|
|1970-01-01T08:00:00.002+08:00|     4.929185|    3.1885583|    4.6980085|
|1970-01-01T08:00:00.003+08:00|    3.5206156|    3.5600138|    4.8080945|
|1970-01-01T08:00:00.004+08:00|    1.3449302|    2.8781595|    3.3195343|
|1970-01-01T08:00:00.005+08:00|    3.3079383|    3.3840187|    3.7278645|
+-----------------------------+-------------+-------------+-------------+
Total line number = 5
It costs 0.054s

Batch Sink

Parameters

ParameterRequiredDefaultTypeDescription
nodeUrlsNo127.0.0.1:6667StringSpecifies the addresses of datanodes in IoTDB. If IoTDB is deployed in cluster mode, multiple addresses can be specified, separated by commas.
userNorootStringIoTDB username
passwordNorootStringIoTDB password
alignedNofalseBooleanWhether to call the aligned interface when writing data to IoTDB.

Example

This example demonstrates how to write data to IoTDB in a Batch Job of a Flink Table:

  • Generate a source table using the IoTDB connector.
  • Register an output table using the IoTDB connector.
  • Write the renamed columns from the source table back to IoTDB.
import org.apache.flink.table.api.DataTypes;
import org.apache.flink.table.api.EnvironmentSettings;
import org.apache.flink.table.api.Schema;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.TableDescriptor;
import org.apache.flink.table.api.TableEnvironment;

import static org.apache.flink.table.api.Expressions.$;

public class BatchSinkTest {
  public static void main(String[] args) {
    // setup environment
    EnvironmentSettings settings = EnvironmentSettings
            .newInstance()
            .inBatchMode()
            .build();
    TableEnvironment tableEnv = TableEnvironment.create(settings);

    // create source table
    Schema sourceTableSchema = Schema
            .newBuilder()
            .column("Time_", DataTypes.BIGINT())
            .column("root.sg.d0.s0", DataTypes.FLOAT())
            .column("root.sg.d1.s0", DataTypes.FLOAT())
            .column("root.sg.d1.s1", DataTypes.FLOAT())
            .build();
    TableDescriptor sourceTableDescriptor = TableDescriptor
            .forConnector("IoTDB")
            .schema(sourceTableSchema)
            .option("sql", "select ** from root.sg.d0,root.sg.d1")
            .build();

    tableEnv.createTemporaryTable("sourceTable", sourceTableDescriptor);
    Table sourceTable = tableEnv.from("sourceTable");
    // register sink table
    Schema sinkTableSchema = Schema
            .newBuilder()
            .column("Time_", DataTypes.BIGINT())
            .column("root.sg.d2.s0", DataTypes.FLOAT())
            .column("root.sg.d3.s0", DataTypes.FLOAT())
            .column("root.sg.d3.s1", DataTypes.FLOAT())
            .build();
    TableDescriptor sinkTableDescriptor = TableDescriptor
            .forConnector("IoTDB")
            .schema(sinkTableSchema)
            .build();
    tableEnv.createTemporaryTable("sinkTable", sinkTableDescriptor);

    // insert data
    sourceTable.renameColumns(
            $("root.sg.d0.s0").as("root.sg.d2.s0"),
            $("root.sg.d1.s0").as("root.sg.d3.s0"),
            $("root.sg.d1.s1").as("root.sg.d3.s1")
    ).insertInto("sinkTable").execute().print();
  }
}

After the above task is executed, the query result in the IoTDB cli is as follows:

IoTDB> select ** from root;
+-----------------------------+-------------+-------------+-------------+-------------+-------------+-------------+
|                         Time|root.sg.d0.s0|root.sg.d1.s0|root.sg.d1.s1|root.sg.d2.s0|root.sg.d3.s0|root.sg.d3.s1|
+-----------------------------+-------------+-------------+-------------+-------------+-------------+-------------+
|1970-01-01T08:00:00.001+08:00|    1.0833644|      2.34874|    1.2414109|    1.0833644|      2.34874|    1.2414109|
|1970-01-01T08:00:00.002+08:00|     4.929185|    3.1885583|    4.6980085|     4.929185|    3.1885583|    4.6980085|
|1970-01-01T08:00:00.003+08:00|    3.5206156|    3.5600138|    4.8080945|    3.5206156|    3.5600138|    4.8080945|
|1970-01-01T08:00:00.004+08:00|    1.3449302|    2.8781595|    3.3195343|    1.3449302|    2.8781595|    3.3195343|
|1970-01-01T08:00:00.005+08:00|    3.3079383|    3.3840187|    3.7278645|    3.3079383|    3.3840187|    3.7278645|
+-----------------------------+-------------+-------------+-------------+-------------+-------------+-------------+
Total line number = 5
It costs 0.015s

Copyright © 2024 The Apache Software Foundation.
Apache IoTDB, IoTDB, Apache, the Apache feather logo, and the Apache IoTDB project logo are either registered trademarks or trademarks of The Apache Software Foundation in all countries

Have a question? Connect with us on QQ, WeChat, or Slack. Join the community now.