Data Sync
6/11/26About 2 min
Data Sync
This document mainly contains the SQL statements for the data synchronization function. For detailed function introduction and usage instructions, see Data Sync
1. Create Task
Syntax:
CREATE PIPE [IF NOT EXISTS] <PipeId> -- PipeId is the name that uniquely identifies the task
-- Data extraction plugin, optional plugin
WITH SOURCE (
[<parameter> = <value>,],
)
-- Data processing plugin, optional plugin
WITH PROCESSOR (
[<parameter> = <value>,],
)
-- Data connection plugin, required plugin
WITH SINK (
[<parameter> = <value>,],
)Example 1: Full Data Synchronization
create pipe A2B
with sink (
'sink'='iotdb-thrift-sink',
'node-urls' = '127.0.0.1:6668',
)Example 2: Partial Data Synchronization
create pipe A2B
WITH SOURCE (
'source'= 'iotdb-source',
'mode.streaming' = 'true',
'database-name'='db_b.*',
'start-time' = '2023.08.23T08:00:00+00:00',
'end-time' = '2023.10.23T08:00:00+00:00'
)
with SINK (
'sink'='iotdb-thrift-async-sink',
'node-urls' = '127.0.0.1:6668',
)Example 3: Edge-Cloud Data Transmission
- Execute the following statement on IoTDB B to synchronize data from B to A
create pipe BA
with source (
'database-name'='db_b.*',
'table-name'='.*',
)
with sink (
'sink'='iotdb-thrift-sink',
'node-urls' = '127.0.0.1:6667',
)- Execute the following statement on IoTDB C to synchronize data from C to A
create pipe CA
with source (
'database-name'='db_c.*',
'table-name'='.*',
)
with sink (
'sink'='iotdb-thrift-sink',
'node-urls' = '127.0.0.1:6668',
)- Execute the following statement on IoTDB D to synchronize data from D to A
create pipe DA
with source (
'database-name'='db_d.*',
'table-name'='.*',
)
with sink (
'sink'='iotdb-thrift-sink',
'node-urls' = '127.0.0.1:6669',
)Example 4: Cascaded Data Transmission
- Execute the following statement on IoTDB A to synchronize data from A to B
create pipe AB
with sink (
'sink'='iotdb-thrift-sink',
'node-urls' = '127.0.0.1:6668',
)- Execute the following statement on IoTDB B to synchronize data from B to C
create pipe BC
with source (
)
with sink (
'sink'='iotdb-thrift-sink',
'node-urls' = '127.0.0.1:6669',
)Example 5: Compressed Synchronization
create pipe A2B
with sink (
'node-urls' = '127.0.0.1:6668',
'compressor' = 'snappy,lz4',
'rate-limit-bytes-per-second'='1048576'
)Example 6: Encrypted Synchronization
create pipe A2B
with sink (
'sink'='iotdb-thrift-ssl-sink',
'node-urls'='127.0.0.1:6667',
'ssl.trust-store-path'='pki/trusted',
'ssl.trust-store-pwd'='root'
)2. Start Task
Syntax:
START PIPE <PipeId>Example:
START PIPE A2B3. Stop Task
Syntax:
STOP PIPE <PipeId>Example:
STOP PIPE A2B4. Drop Task
Syntax:
DROP PIPE [IF EXISTS] <PipeId>Example:
DROP PIPE IF EXISTS A2B5. Show Tasks
Syntax:
-- Show all tasks
SHOW PIPES
-- Show a specific task
SHOW PIPE <PipeId>Example:
SHOW PIPES
SHOW PIPE A2B6. Alter Task
Syntax:
ALTER PIPE [IF EXISTS] <PipeId>
MODIFY/REPLACE SOURCE(...)
MODIFY/REPLACE PROCESSOR(...)
MODIFY/REPLACE SINK(...)Example:
ALTER PIPE A2B REPLACE SINK ('sink'='iotdb-thrift-sink', 'node-urls' = '127.0.0.1:6668');