Skip to main content

IoTDB Data Sync

...About 11 min

IoTDB Data Sync

The IoTDB data sync transfers data from IoTDB to another data platform, and .

A Pipe consists of three subtasks (plugins):

  • Extract
  • Process
  • Connect

Pipe allows users to customize the processing logic of these three subtasks, just like handling data using UDF (User-Defined Functions). Within a Pipe, the aforementioned subtasks are executed and implemented by three types of plugins. Data flows through these three plugins sequentially: Pipe Extractor is used to extract data, Pipe Processor is used to process data, and Pipe Connector is used to send data to an external system.

The model of a Pipe task is as follows:

pipe.png
pipe.png

It describes a data sync task, which essentially describes the attributes of the Pipe Extractor, Pipe Processor, and Pipe Connector plugins. Users can declaratively configure the specific attributes of the three subtasks through SQL statements. By combining different attributes, flexible data ETL (Extract, Transform, Load) capabilities can be achieved.

By utilizing the data sync functionality, a complete data pipeline can be built to fulfill various requirements such as edge-to-cloud sync, remote disaster recovery, and read-write workload distribution across multiple databases.

Quick Start

🎯 Goal: Achieve full data sync of IoTDB A -> IoTDB B

  • Start two IoTDBs,A(datanode -> 127.0.0.1:6667) B(datanode -> 127.0.0.1:6668)

  • create a Pipe from A -> B, and execute on A

    create pipe a2b
    with connector (
      'connector'='iotdb-thrift-connector',
      'connector.ip'='127.0.0.1',
      'connector.port'='6668'
    )
    
  • start a Pipe from A -> B, and execute on A

    start pipe a2b
    
  • Write data to A

    INSERT INTO root.db.d(time, m) values (1, 1)
    
  • Checking data synchronised from A at B

    SELECT ** FROM root
    

❗️Note: The current IoTDB -> IoTDB implementation of data sync does not support DDL sync

That is: ttl, trigger, alias, template, view, create/delete sequence, create/delete database, etc. are not supported.

IoTDB -> IoTDB data sync requires the target IoTDB:

  • Enable automatic metadata creation: manual configuration of encoding and compression of data types to be consistent with the sender is required
  • Do not enable automatic metadata creation: manually create metadata that is consistent with the source

Sync Task Management

Create a sync task

A data sync task can be created using the CREATE PIPE statement, a sample SQL statement is shown below:

CREATE PIPE <PipeId> -- PipeId is the name that uniquely identifies the sync task
WITH EXTRACTOR (
  -- Default IoTDB Data Extraction Plugin
  'extractor'                    = 'iotdb-extractor',
  -- Path prefix, only data that can match the path prefix will be extracted for subsequent processing and delivery
  'extractor.pattern'            = 'root.timecho',
  -- Whether to extract historical data
  'extractor.history.enable'     = 'true',
  -- Describes the time range of the historical data being extracted, indicating the earliest possible time
  'extractor.history.start-time' = '2011.12.03T10:15:30+01:00',
  -- Describes the time range of the extracted historical data, indicating the latest time
  'extractor.history.end-time'   = '2022.12.03T10:15:30+01:00',
  -- Whether to extract realtime data
  'extractor.realtime.enable'    = 'true',
)
WITH PROCESSOR (
  -- Default data processing plugin, means no processing
  'processor'                    = 'do-nothing-processor',
)
WITH CONNECTOR (
  -- IoTDB data sending plugin with target IoTDB
  'connector'                    = 'iotdb-thrift-connector',
  -- Data service for one of the DataNode nodes on the target IoTDB ip
  'connector.ip'                 = '127.0.0.1',
  -- Data service port of one of the DataNode nodes of the target IoTDB
  'connector.port'               = '6667',
)

To create a sync task it is necessary to configure the PipeId and the parameters of the three plugin sections:

configuration itemdescriptionRequired or notdefault implementationDefault implementation descriptionWhether to allow custom implementations
pipeIdGlobally uniquely identifies the name of a sync task---
extractorpipe Extractor plugin, for extracting synchronized data at the bottom of the databaseOptionaliotdb-extractorIntegrate all historical data of the database and subsequent realtime data into the sync taskno
processorPipe Processor plugin, for processing dataOptionaldo-nothing-processorno processing of incoming data
connectorPipe Connector plugin,for sending data--

In the example, the iotdb-extractor, do-nothing-processor, and iotdb-thrift-connector plugins are used to build the data sync task. iotdb has other built-in data sync plugins, see the section "System Pre-built Data Sync Plugin".
An example of a minimalist CREATE PIPE statement is as follows:

CREATE PIPE <PipeId> -- PipeId is a name that uniquely identifies the task.
WITH CONNECTOR (
  -- IoTDB data sending plugin with target IoTDB
  'connector'      = 'iotdb-thrift-connector',
  -- Data service for one of the DataNode nodes on the target IoTDB ip
  'connector.ip'   = '127.0.0.1',
  -- Data service port of one of the DataNode nodes of the target IoTDB
  'connector.port' = '6667',
)

The expressed semantics are: synchronise the full amount of historical data and subsequent arrivals of realtime data from this database instance to the IoTDB instance with target 127.0.0.1:6667.

Note:

  • EXTRACTOR and PROCESSOR are optional, if no configuration parameters are filled in, the system will use the corresponding default implementation.

  • The CONNECTOR is a mandatory configuration that needs to be declared in the CREATE PIPE statement for configuring purposes.

  • The CONNECTOR exhibits self-reusability. For different tasks, if their CONNECTOR possesses identical KV properties (where the value corresponds to every key), the system will ultimately create only one instance of the CONNECTOR to achieve resource reuse for connections.

    • For example, there are the following pipe1, pipe2 task declarations:
    CREATE PIPE pipe1
    WITH CONNECTOR (
      'connector' = 'iotdb-thrift-connector',
      'connector.thrift.host' = 'localhost',
      'connector.thrift.port' = '9999',
    )
    
    CREATE PIPE pipe2
    WITH CONNECTOR (
      'connector' = 'iotdb-thrift-connector',
      'connector.thrift.port' = '9999',
      'connector.thrift.host' = 'localhost',
    )
    
    • Since they have identical CONNECTOR declarations (even if the order of some properties is different), the framework will automatically reuse the CONNECTOR declared by them. Hence, the CONNECTOR instances for pipe1 and pipe2 will be the same.
  • Please note that we should avoid constructing application scenarios that involve data cycle sync (as it can result in an infinite loop):

    • IoTDB A -> IoTDB B -> IoTDB A
    • IoTDB A -> IoTDB A

START TASK

After the successful execution of the CREATE PIPE statement, task-related instances will be created. However, the overall task's running status will be set to STOPPED, meaning the task will not immediately process data.

You can use the START PIPE statement to begin processing data for a task:

START PIPE <PipeId>

STOP TASK

the STOP PIPE statement can be used to halt the data processing:

STOP PIPE <PipeId>

DELETE TASK

If a task is in the RUNNING state, you can use the DROP PIPE statement to stop the data processing and delete the entire task:

DROP PIPE <PipeId>

Before deleting a task, there is no need to execute the STOP operation.

SHOW TASK

You can use the SHOW PIPES statement to view all tasks:

SHOW PIPES

The query results are as follows:

+-----------+-----------------------+-------+-------------+-------------+-------------+----------------+
|         ID|          CreationTime |  State|PipeExtractor|PipeProcessor|PipeConnector|ExceptionMessage|
+-----------+-----------------------+-------+-------------+-------------+-------------+----------------+
|iotdb-kafka|2022-03-30T20:58:30.689|RUNNING|          ...|          ...|          ...|            None|
+-----------+-----------------------+-------+-------------+-------------+-------------+----------------+
|iotdb-iotdb|2022-03-31T12:55:28.129|STOPPED|          ...|          ...|          ...| TException: ...|
+-----------+-----------------------+-------+-------------+-------------+-------------+----------------+

You can use <PipeId> to specify the status of a particular synchronization task:

SHOW PIPE <PipeId>

Additionally, the WHERE clause can be used to determine if the Pipe Connector used by a specific <PipeId> is being reused.

SHOW PIPES
WHERE CONNECTOR USED BY <PipeId>

Task Running Status Migration

The task running status can transition through several states during the lifecycle of a data synchronization pipe:

  • STOPPED: The pipe is in a stopped state. It can have the following possibilities:
    • After the successful creation of a pipe, its initial state is set to stopped
    • The user manually pauses a pipe that is in normal running state, transitioning its status from RUNNING to STOPPED
    • If a pipe encounters an unrecoverable error during execution, its status automatically changes from RUNNING to STOPPED.
  • RUNNING: The pipe is actively processing data
  • DROPPED: The pipe is permanently deleted

The following diagram illustrates the different states and their transitions:

state migration diagram
state migration diagram

System Pre-built Data Sync Plugin

View pre-built plugin

User can view the plugins in the system on demand. The statement for viewing plugins is shown below.

SHOW PIPEPLUGINS

Pre-built Extractor Plugin

iotdb-extractor

Function: Extract historical or realtime data inside IoTDB into pipe.

keyvaluevalue rangerequired or optional with default
extractoriotdb-extractorString: iotdb-extractorrequired
extractor.patternpath prefix for filtering time seriesString: any time series prefixoptional: root
extractor.history.enablewhether to synchronize historical dataBoolean: true, falseoptional: true
extractor.history.start-timestart of synchronizing historical data event time,Include start-timeLong: [Long.MIN_VALUE, Long.MAX_VALUE]optional: Long.MIN_VALUE
extractor.history.end-timeend of synchronizing historical data event time,Include end-timeLong: [Long.MIN_VALUE, Long.MAX_VALUE]optional: Long.MAX_VALUE
extractor.realtime.enableWhether to synchronize realtime dataBoolean: true, falseoptional: true

🚫 extractor.pattern Parameter Description

  • Pattern should use backquotes to modify illegal characters or illegal path nodes, for example, if you want to filter root.`a@b` or root.`123`, you should set the pattern to root.`a@b` or root.`123`(Refer specifically to Timing of single and double quotes and backquotesopen in new window

  • In the underlying implementation, when pattern is detected as root (default value), synchronization efficiency is higher, and any other format will reduce performance.

  • The path prefix does not need to form a complete path. For example, when creating a pipe with the parameter 'extractor.pattern'='root.aligned.1':

    • root.aligned.1TS
    • root.aligned.1TS.`1`
    • root.aligned.100TS

    the data will be synchronized;

    • root.aligned.`1`
    • root.aligned.`123`

    the data will not be synchronized.

❗️start-time, end-time parameter description of extractor.history

  • start-time, end-time should be in ISO format, such as 2011-12-03T10:15:30 or 2011-12-03T10:15:30+01:00

a piece of data from production to IoTDB contains two key concepts of time

  • event time: the time when the data is actually produced (or the generation time assigned to the data by the data production system, which is a time item in the data point), also called the event time.
  • arrival time: the time the data arrived in the IoTDB system.

The out-of-order data we often refer to refers to data whose event time is far behind the current system time (or the maximum event time that has been dropped) when the data arrives. On the other hand, whether it is out-of-order data or sequential data, as long as they arrive newly in the system, their arrival time will increase with the order in which the data arrives at IoTDB.

💎 the work of iotdb-extractor can be split into two stages

  1. Historical data extraction: All data with arrival time < current system time when creating the pipe is called historical data
  2. Realtime data extraction: All data with arrival time >= current system time when the pipe is created is called realtime data

The historical data transmission phase and the realtime data transmission phase are executed serially. Only when the historical data transmission phase is completed, the realtime data transmission phase is executed.**

Users can specify iotdb-extractor to:

  • Historical data extraction('extractor.history.enable' = 'true', 'extractor.realtime.enable' = 'false'
  • Realtime data extraction('extractor.history.enable' = 'false', 'extractor.realtime.enable' = 'true'
  • Full data extraction('extractor.history.enable' = 'true', 'extractor.realtime.enable' = 'true'
  • Disable simultaneous sets extractor.history.enable and extractor.realtime.enable to false

Pre-built Processor Plugin

do-nothing-processor

Function: Do not do anything with the events passed in by the extractor.

keyvaluevalue rangerequired or optional with default
processordo-nothing-processorString: do-nothing-processorrequired

pre-connector plugin

iotdb-thrift-sync-connector(alias:iotdb-thrift-connector)

Function: Primarily used for data transfer between IoTDB instances (v1.2.0+). Data is transmitted using the Thrift RPC framework and a single-threaded blocking IO model. It guarantees that the receiving end applies the data in the same order as the sending end receives the write requests.

Limitation: Both the source and target IoTDB versions need to be v1.2.0+.

keyvaluevalue rangerequired or optional with default
connectoriotdb-thrift-connector or iotdb-thrift-sync-connectorString: iotdb-thrift-connector or iotdb-thrift-sync-connectorrequired
connector.ipthe data service IP of one of the DataNode nodes in the target IoTDBStringoptional: and connector.node-urls fill in either one
connector.portthe data service port of one of the DataNode nodes in the target IoTDBIntegeroptional: and connector.node-urls fill in either one
connector.node-urlsthe URL of the data service port of any multiple DataNode nodes in the target IoTDBString。eg:'127.0.0.1:6667,127.0.0.1:6668,127.0.0.1:6669', '127.0.0.1:6667'optional: and connector.ip:connector.port fill in either one

📌 Please ensure that the receiving end has already created all the time series present in the sending end or has enabled automatic metadata creation. Otherwise, it may result in the failure of the pipe operation.

iotdb-thrift-async-connector

Function: Primarily used for data transfer between IoTDB instances (v1.2.0+).
Data is transmitted using the Thrift RPC framework, employing a multi-threaded async non-blocking IO model, resulting in high transfer performance. It is particularly suitable for distributed scenarios on the target end.
It does not guarantee that the receiving end applies the data in the same order as the sending end receives the write requests, but it guarantees data integrity (at-least-once).

Limitation: Both the source and target IoTDB versions need to be v1.2.0+.

keyvaluevalue rangerequired or optional with default
connectoriotdb-thrift-async-connectorString: iotdb-thrift-async-connectorrequired
connector.ipthe data service IP of one of the DataNode nodes in the target IoTDBStringoptional: and connector.node-urls fill in either one
connector.portthe data service port of one of the DataNode nodes in the target IoTDBIntegeroptional: and connector.node-urls fill in either one
connector.node-urlsthe URL of the data service port of any multiple DataNode nodes in the target IoTDBString。eg:'127.0.0.1:6667,127.0.0.1:6668,127.0.0.1:6669', '127.0.0.1:6667'optional: and connector.ip:connector.port fill in either one

📌 Please ensure that the receiving end has already created all the time series present in the sending end or has enabled automatic metadata creation. Otherwise, it may result in the failure of the pipe operation.

iotdb-legacy-pipe-connector

Function: Mainly used to transfer data from IoTDB (v1.2.0+) to lower versions of IoTDB, using the data synchronization (Sync) protocol before version v1.2.0.
Data is transmitted using the Thrift RPC framework. It employs a single-threaded sync blocking IO model, resulting in weak transfer performance.

Limitation: The source IoTDB version needs to be v1.2.0+. The target IoTDB version can be either v1.2.0+, v1.1.x (lower versions of IoTDB are theoretically supported but untested).

Note: In theory, any version prior to v1.2.0 of IoTDB can serve as the data synchronization (Sync) receiver for v1.2.0+.

keyvaluevalue rangerequired or optional with default
connectoriotdb-legacy-pipe-connectorstring: iotdb-legacy-pipe-connectorrequired
connector.ipdata service of one DataNode node of the target IoTDB ipstringrequired
connector.portthe data service port of one of the DataNode nodes in the target IoTDBintegerrequired
connector.userthe user name of the target IoTDB. Note that the user needs to support data writing and TsFile Load permissions.stringoptional: root
connector.passwordthe password of the target IoTDB. Note that the user needs to support data writing and TsFile Load permissions.stringoptional: root
connector.versionthe version of the target IoTDB, used to disguise its actual version and bypass the version consistency check of the target.stringoptional: 1.1

📌 Make sure that the receiver has created all the time series on the sender side, or that automatic metadata creation is turned on, otherwise the pipe run will fail.

do-nothing-connector

Function: Does not do anything with the events passed in by the processor.

keyvaluevalue rangerequired or optional with default
connectordo-nothing-connectorString: do-nothing-connectorrequired

Authority Management

Authority NameDescription
CREATE_PIPERegister task,path-independent
START_PIPEStart task,path-independent
STOP_PIPEStop task,path-independent
DROP_PIPEUninstall task,path-independent
SHOW_PIPESQuery task,path-independent

Configure Parameters

In iotdb-common.properties :

####################
### Pipe Configuration
####################

# Uncomment the following field to configure the pipe lib directory.
# For Windows platform
# If its prefix is a drive specifier followed by "\\", or if its prefix is "\\\\", then the path is
# absolute. Otherwise, it is relative.
# pipe_lib_dir=ext\\pipe
# For Linux platform
# If its prefix is "/", then the path is absolute. Otherwise, it is relative.
# pipe_lib_dir=ext/pipe

# The maximum number of threads that can be used to execute the pipe subtasks in PipeSubtaskExecutor.
# The actual value will be min(pipe_subtask_executor_max_thread_num, max(1, CPU core number / 2)).
# pipe_subtask_executor_max_thread_num=5

# The connection timeout (in milliseconds) for the thrift client.
# pipe_connector_timeout_ms=900000

# The maximum number of selectors that can be used in the async connector.
# pipe_async_connector_selector_number=1

# The core number of clients that can be used in the async connector.
# pipe_async_connector_core_client_number=8

# The maximum number of clients that can be used in the async connector.
# pipe_async_connector_max_client_number=16

Functionality Features

At least one semantic guarantee at-least-once

The data synchronization feature provides an at-least-once delivery semantic when transferring data to external systems. In most scenarios, the synchronization feature guarantees exactly-once delivery, ensuring that all data is synchronized exactly once.

However, in the following scenarios, it is possible for some data to be synchronized multiple times (due to resumable transmission):

  • Temporary network failures: If a data transmission request fails, the system will retry sending it until reaching the maximum retry attempts.
  • Abnormal implementation of the Pipe plugin logic: If an error is thrown during the plugin's execution, the system will retry sending the data until reaching the maximum retry attempts.
  • Data partition switching due to node failures or restarts: After the partition change is completed, the affected data will be retransmitted.
  • Cluster unavailability: Once the cluster becomes available again, the affected data will be retransmitted.

Source: Data Writing with Pipe Processing and Asynchronous Decoupling of Data Transmission

In the data synchronization feature, data transfer adopts an asynchronous replication mode.

Data synchronization is completely decoupled from the writing operation, eliminating any impact on the critical path of writing. This mechanism allows the framework to maintain the writing speed of a time-series database while ensuring continuous data synchronization.

Source: High Availability of Pipe Service in a Highly Available Cluster Deployment

When the sender end IoTDB is deployed in a high availability cluster mode, the data synchronization service will also be highly available. The data synchronization framework monitors the data synchronization progress of each data node and periodically takes lightweight distributed consistent snapshots to preserve the synchronization state.

  • In the event of a failure of a data node in the sender cluster, the data synchronization framework can leverage the consistent snapshot and the data stored in replicas to quickly recover and resume synchronization, thus achieving high availability of the data synchronization service.
  • In the event of a complete failure and restart of the sender cluster, the data synchronization framework can also use snapshots to recover the synchronization service.

Copyright © 2024 The Apache Software Foundation.
Apache and the Apache feather logo are trademarks of The Apache Software Foundation

Have a question? Connect with us on QQ, WeChat, or Slack. Join the community now.