Stream Computing Framework
Stream Computing Framework
The IoTDB stream processing framework allows users to implement customized stream processing logic, which can monitor and capture storage engine changes, transform changed data, and push transformed data outward.
We call . A stream processing task (Pipe) contains three subtasks:
- Source task
- Processor task
- Sink task
The stream processing framework allows users to customize the processing logic of three subtasks using Java language and process data in a UDF-like manner.
 In a Pipe, the three subtasks mentioned above are executed and implemented by three types of plugins. Data flows through these three plugins sequentially for processing:
 Pipe Source is used to extract data, Pipe Processor is used to process data, Pipe Sink is used to send data, and the final data will be sent to an external system.
The model for a Pipe task is as follows:

A data stream processing task essentially describes the attributes of the Pipe Source, Pipe Processor, and Pipe Sink plugins.
Users can configure the specific attributes of these three subtasks declaratively using SQL statements. By combining different attributes, flexible data ETL (Extract, Transform, Load) capabilities can be achieved.
Using the stream processing framework, it is possible to build a complete data pipeline to fulfill various requirements such as edge-to-cloud synchronization, remote disaster recovery, and read/write load balancing across multiple databases.
Custom Stream Processing Plugin Development
Programming development dependencies
It is recommended to use Maven to build the project. Add the following dependencies in the pom.xml file. Please make sure to choose dependencies with the same version as the IoTDB server version.
<dependency>
    <groupId>org.apache.iotdb</groupId>
    <artifactId>pipe-api</artifactId>
    <version>1.3.1</version>
    <scope>provided</scope>
</dependency>Event-Driven Programming Model
The design of user programming interfaces for stream processing plugins follows the principles of the event-driven programming model. In this model, events serve as the abstraction of data in the user programming interface. The programming interface is decoupled from the specific execution method, allowing the focus to be on describing how the system expects events (data) to be processed upon arrival.
In the user programming interface of stream processing plugins, events abstract the write operations of database data. Events are captured by the local stream processing engine and passed sequentially through the three stages of stream processing, namely Pipe Source, Pipe Processor, and Pipe Sink plugins. User logic is triggered and executed within these three plugins.
To accommodate both low-latency stream processing in low-load scenarios and high-throughput stream processing in high-load scenarios at the edge, the stream processing engine dynamically chooses the processing objects from operation logs and data files. Therefore, the user programming interface for stream processing requires the user to provide the handling logic for two types of events: TabletInsertionEvent for operation log write events and TsFileInsertionEvent for data file write events.
TabletInsertionEvent
The TabletInsertionEvent is a high-level data abstraction for user write requests, which provides the ability to manipulate the underlying data of the write request by providing a unified operation interface.
For different database deployments, the underlying storage structure corresponding to the operation log write event is different. For stand-alone deployment scenarios, the operation log write event is an encapsulation of write-ahead log (WAL) entries; for distributed deployment scenarios, the operation log write event is an encapsulation of individual node consensus protocol operation log entries.
For write operations generated by different write request interfaces of the database, the data structure of the request structure corresponding to the operation log write event is also different.IoTDB provides many write interfaces such as InsertRecord, InsertRecords, InsertTablet, InsertTablets, and so on, and each kind of write request uses a completely different serialisation method to generate a write request. completely different serialisation methods and generate different binary entries.
The existence of operation log write events provides users with a unified view of data operations, which shields the implementation differences of the underlying data structures, greatly reduces the programming threshold for users, and improves the ease of use of the functionality.
/** TabletInsertionEvent is used to define the event of data insertion. */
public interface TabletInsertionEvent extends Event {
  /**
   * The consumer processes the data row by row and collects the results by RowCollector.
   *
   * @return {@code Iterable<TabletInsertionEvent>} a list of new TabletInsertionEvent contains the
   *     results collected by the RowCollector
   */
  Iterable<TabletInsertionEvent> processRowByRow(BiConsumer<Row, RowCollector> consumer);
  /**
   * The consumer processes the Tablet directly and collects the results by RowCollector.
   *
   * @return {@code Iterable<TabletInsertionEvent>} a list of new TabletInsertionEvent contains the
   *     results collected by the RowCollector
   */
  Iterable<TabletInsertionEvent> processTablet(BiConsumer<Tablet, RowCollector> consumer);
}TsFileInsertionEvent
The TsFileInsertionEvent represents a high-level abstraction of the database's disk flush operation and is a collection of multiple TabletInsertionEvents.
IoTDB's storage engine is based on the LSM (Log-Structured Merge) structure. When data is written, the write operations are first flushed to log-structured files, while the written data is also stored in memory. When the memory reaches its capacity limit, a flush operation is triggered, converting the data in memory into a database file while deleting the previously written log entries. During the conversion from memory data to database file data, two compression processes, encoding compression and universal compression, are applied. As a result, the data in the database file occupies less space compared to the original data in memory.
In extreme network conditions, directly transferring data files is more cost-effective than transmitting individual write operations. It consumes lower network bandwidth and achieves faster transmission speed. However, there is no such thing as a free lunch. Performing calculations on data in the disk file incurs additional costs for file I/O compared to performing calculations directly on data in memory. Nevertheless, the coexistence of disk data files and memory write operations permits dynamic trade-offs and adjustments. It is based on this observation that the data file write event is introduced into the event model of the plugin.
In summary, the data file write event appears in the event stream of stream processing plugins in the following two scenarios:
- Historical data extraction: Before a stream processing task starts, all persisted write data exists in the form of TsFiles. When collecting historical data at the beginning of a stream processing task, the historical data is abstracted as TsFileInsertionEvent. 
- Real-time data extraction: During the execution of a stream processing task, if the speed of processing the log entries representing real-time operations is slower than the rate of write requests, the unprocessed log entries will be persisted to disk in the form of TsFiles. When these data are extracted by the stream processing engine, they are abstracted as TsFileInsertionEvent. 
/**
 * TsFileInsertionEvent is used to define the event of writing TsFile. Event data stores in disks,
 * which is compressed and encoded, and requires IO cost for computational processing.
 */
public interface TsFileInsertionEvent extends Event {
  /**
   * The method is used to convert the TsFileInsertionEvent into several TabletInsertionEvents.
   *
   * @return {@code Iterable<TabletInsertionEvent>} the list of TabletInsertionEvent
   */
  Iterable<TabletInsertionEvent> toTabletInsertionEvents();
}Custom Stream Processing Plugin Programming Interface Definition
Based on the custom stream processing plugin programming interface, users can easily write data extraction plugins, data processing plugins, and data sending plugins, allowing the stream processing functionality to adapt flexibly to various industrial scenarios.
Data Extraction Plugin Interface
Data extraction is the first stage of the three-stage process of stream processing, which includes data extraction, data processing, and data sending. The data extraction plugin (PipeSource) serves as a bridge between the stream processing engine and the storage engine. It captures various data write events by listening to the behavior of the storage engine.
/**
 * PipeSource
 *
 * <p>PipeSource is responsible for capturing events from sources.
 *
 * <p>Various data sources can be supported by implementing different PipeSource classes.
 *
 * <p>The lifecycle of a PipeSource is as follows:
 *
 * <ul>
 *   <li>When a collaboration task is created, the KV pairs of `WITH SOURCE` clause in SQL are
 *       parsed and the validation method {@link PipeSource#validate(PipeParameterValidator)} will
 *       be called to validate the parameters.
 *   <li>Before the collaboration task starts, the method {@link
 *       PipeSource#customize(PipeParameters, PipeSourceRuntimeConfiguration)} will be called to
 *       config the runtime behavior of the PipeSource.
 *   <li>Then the method {@link PipeSource#start()} will be called to start the PipeSource.
 *   <li>While the collaboration task is in progress, the method {@link PipeSource#supply()} will be
 *       called to capture events from sources and then the events will be passed to the
 *       PipeProcessor.
 *   <li>The method {@link PipeSource#close()} will be called when the collaboration task is
 *       cancelled (the `DROP PIPE` command is executed).
 * </ul>
 */
public interface PipeSource extends PipePlugin {
  /**
   * This method is mainly used to validate {@link PipeParameters} and it is executed before {@link
   * PipeSource#customize(PipeParameters, PipeSourceRuntimeConfiguration)} is called.
   *
   * @param validator the validator used to validate {@link PipeParameters}
   * @throws Exception if any parameter is not valid
   */
  void validate(PipeParameterValidator validator) throws Exception;
  /**
   * This method is mainly used to customize PipeSource. In this method, the user can do the
   * following things:
   *
   * <ul>
   *   <li>Use PipeParameters to parse key-value pair attributes entered by the user.
   *   <li>Set the running configurations in PipeSourceRuntimeConfiguration.
   * </ul>
   *
   * <p>This method is called after the method {@link PipeSource#validate(PipeParameterValidator)}
   * is called.
   *
   * @param parameters used to parse the input parameters entered by the user
   * @param configuration used to set the required properties of the running PipeSource
   * @throws Exception the user can throw errors if necessary
   */
  void customize(PipeParameters parameters, PipeSourceRuntimeConfiguration configuration)
      throws Exception;
  /**
   * Start the source. After this method is called, events should be ready to be supplied by
   * {@link PipeSource#supply()}. This method is called after {@link
   * PipeSource#customize(PipeParameters, PipeSourceRuntimeConfiguration)} is called.
   *
   * @throws Exception the user can throw errors if necessary
   */
  void start() throws Exception;
  /**
   * Supply single event from the source and the caller will send the event to the processor.
   * This method is called after {@link PipeSource#start()} is called.
   *
   * @return the event to be supplied. the event may be null if the source has no more events at
   *     the moment, but the source is still running for more events.
   * @throws Exception the user can throw errors if necessary
   */
  Event supply() throws Exception;
}Data Processing Plugin Interface
Data processing is the second stage of the three-stage process of stream processing, which includes data extraction, data processing, and data sending. The data processing plugin (PipeProcessor) is primarily used for filtering and transforming the various events captured by the data extraction plugin (PipeSource).
/**
 * PipeProcessor
 *
 * <p>PipeProcessor is used to filter and transform the Event formed by the PipeSource.
 *
 * <p>The lifecycle of a PipeProcessor is as follows:
 *
 * <ul>
 *   <li>When a collaboration task is created, the KV pairs of `WITH PROCESSOR` clause in SQL are
 *       parsed and the validation method {@link PipeProcessor#validate(PipeParameterValidator)}
 *       will be called to validate the parameters.
 *   <li>Before the collaboration task starts, the method {@link
 *       PipeProcessor#customize(PipeParameters, PipeProcessorRuntimeConfiguration)} will be called
 *       to config the runtime behavior of the PipeProcessor.
 *   <li>While the collaboration task is in progress:
 *       <ul>
 *         <li>PipeSource captures the events and wraps them into three types of Event instances.
 *         <li>PipeProcessor processes the event and then passes them to the PipeSink. The
 *             following 3 methods will be called: {@link
 *             PipeProcessor#process(TabletInsertionEvent, EventCollector)}, {@link
 *             PipeProcessor#process(TsFileInsertionEvent, EventCollector)} and {@link
 *             PipeProcessor#process(Event, EventCollector)}.
 *         <li>PipeSink serializes the events into binaries and send them to sinks.
 *       </ul>
 *   <li>When the collaboration task is cancelled (the `DROP PIPE` command is executed), the {@link
 *       PipeProcessor#close() } method will be called.
 * </ul>
 */
public interface PipeProcessor extends PipePlugin {
  /**
   * This method is mainly used to validate {@link PipeParameters} and it is executed before {@link
   * PipeProcessor#customize(PipeParameters, PipeProcessorRuntimeConfiguration)} is called.
   *
   * @param validator the validator used to validate {@link PipeParameters}
   * @throws Exception if any parameter is not valid
   */
  void validate(PipeParameterValidator validator) throws Exception;
  /**
   * This method is mainly used to customize PipeProcessor. In this method, the user can do the
   * following things:
   *
   * <ul>
   *   <li>Use PipeParameters to parse key-value pair attributes entered by the user.
   *   <li>Set the running configurations in PipeProcessorRuntimeConfiguration.
   * </ul>
   *
   * <p>This method is called after the method {@link
   * PipeProcessor#validate(PipeParameterValidator)} is called and before the beginning of the
   * events processing.
   *
   * @param parameters used to parse the input parameters entered by the user
   * @param configuration used to set the required properties of the running PipeProcessor
   * @throws Exception the user can throw errors if necessary
   */
  void customize(PipeParameters parameters, PipeProcessorRuntimeConfiguration configuration)
      throws Exception;
  /**
   * This method is called to process the TabletInsertionEvent.
   *
   * @param tabletInsertionEvent TabletInsertionEvent to be processed
   * @param eventCollector used to collect result events after processing
   * @throws Exception the user can throw errors if necessary
   */
  void process(TabletInsertionEvent tabletInsertionEvent, EventCollector eventCollector)
      throws Exception;
  /**
   * This method is called to process the TsFileInsertionEvent.
   *
   * @param tsFileInsertionEvent TsFileInsertionEvent to be processed
   * @param eventCollector used to collect result events after processing
   * @throws Exception the user can throw errors if necessary
   */
  default void process(TsFileInsertionEvent tsFileInsertionEvent, EventCollector eventCollector)
      throws Exception {
    for (final TabletInsertionEvent tabletInsertionEvent :
        tsFileInsertionEvent.toTabletInsertionEvents()) {
      process(tabletInsertionEvent, eventCollector);
    }
  }
  /**
   * This method is called to process the Event.
   *
   * @param event Event to be processed
   * @param eventCollector used to collect result events after processing
   * @throws Exception the user can throw errors if necessary
   */
  void process(Event event, EventCollector eventCollector) throws Exception;
}Data Sending Plugin Interface
Data sending is the third stage of the three-stage process of stream processing, which includes data extraction, data processing, and data sending. The data sending plugin (PipeSink) is responsible for sending the various events processed by the data processing plugin (PipeProcessor). It serves as the network implementation layer of the stream processing framework and should support multiple real-time communication protocols and connectors in its interface.
/**
 * PipeSink
 *
 * <p>PipeSink is responsible for sending events to sinks.
 *
 * <p>Various network protocols can be supported by implementing different PipeSink classes.
 *
 * <p>The lifecycle of a PipeSink is as follows:
 *
 * <ul>
 *   <li>When a collaboration task is created, the KV pairs of `WITH SINK` clause in SQL are
 *       parsed and the validation method {@link PipeSink#validate(PipeParameterValidator)} will be
 *       called to validate the parameters.
 *   <li>Before the collaboration task starts, the method {@link PipeSink#customize(PipeParameters,
 *       PipeSinkRuntimeConfiguration)} will be called to configure the runtime behavior of the
 *       PipeSink and the method {@link PipeSink#handshake()} will be called to create a connection
 *       with sink.
 *   <li>While the collaboration task is in progress:
 *       <ul>
 *         <li>PipeSource captures the events and wraps them into three types of Event instances.
 *         <li>PipeProcessor processes the event and then passes them to the PipeSink.
 *         <li>PipeSink serializes the events into binaries and send them to sinks. The following 3
 *             methods will be called: {@link PipeSink#transfer(TabletInsertionEvent)}, {@link
 *             PipeSink#transfer(TsFileInsertionEvent)} and {@link PipeSink#transfer(Event)}.
 *       </ul>
 *   <li>When the collaboration task is cancelled (the `DROP PIPE` command is executed), the {@link
 *       PipeSink#close() } method will be called.
 * </ul>
 *
 * <p>In addition, the method {@link PipeSink#heartbeat()} will be called periodically to check
 * whether the connection with sink is still alive. The method {@link PipeSink#handshake()} will be
 * called to create a new connection with the sink when the method {@link PipeSink#heartbeat()}
 * throws exceptions.
 */
public interface PipeSink extends PipePlugin {
  /**
   * This method is mainly used to validate {@link PipeParameters} and it is executed before {@link
   * PipeSink#customize(PipeParameters, PipeSinkRuntimeConfiguration)} is called.
   *
   * @param validator the validator used to validate {@link PipeParameters}
   * @throws Exception if any parameter is not valid
   */
  void validate(PipeParameterValidator validator) throws Exception;
  /**
   * This method is mainly used to customize PipeSink. In this method, the user can do the following
   * things:
   *
   * <ul>
   *   <li>Use PipeParameters to parse key-value pair attributes entered by the user.
   *   <li>Set the running configurations in PipeSinkRuntimeConfiguration.
   * </ul>
   *
   * <p>This method is called after the method {@link PipeSink#validate(PipeParameterValidator)} is
   * called and before the method {@link PipeSink#handshake()} is called.
   *
   * @param parameters used to parse the input parameters entered by the user
   * @param configuration used to set the required properties of the running PipeSink
   * @throws Exception the user can throw errors if necessary
   */
  void customize(PipeParameters parameters, PipeSinkRuntimeConfiguration configuration)
      throws Exception;
  /**
   * This method is used to create a connection with sink. This method will be called after the
   * method {@link PipeSink#customize(PipeParameters, PipeSinkRuntimeConfiguration)} is called or
   * will be called when the method {@link PipeSink#heartbeat()} throws exceptions.
   *
   * @throws Exception if the connection is failed to be created
   */
  void handshake() throws Exception;
  /**
   * This method will be called periodically to check whether the connection with sink is still
   * alive.
   *
   * @throws Exception if the connection dies
   */
  void heartbeat() throws Exception;
  /**
   * This method is used to transfer the TabletInsertionEvent.
   *
   * @param tabletInsertionEvent TabletInsertionEvent to be transferred
   * @throws PipeConnectionException if the connection is broken
   * @throws Exception the user can throw errors if necessary
   */
  void transfer(TabletInsertionEvent tabletInsertionEvent) throws Exception;
  /**
   * This method is used to transfer the TsFileInsertionEvent.
   *
   * @param tsFileInsertionEvent TsFileInsertionEvent to be transferred
   * @throws PipeConnectionException if the connection is broken
   * @throws Exception the user can throw errors if necessary
   */
  default void transfer(TsFileInsertionEvent tsFileInsertionEvent) throws Exception {
    try {
      for (final TabletInsertionEvent tabletInsertionEvent :
          tsFileInsertionEvent.toTabletInsertionEvents()) {
        transfer(tabletInsertionEvent);
      }
    } finally {
      tsFileInsertionEvent.close();
    }
  }
  /**
   * This method is used to transfer the generic events, including HeartbeatEvent.
   *
   * @param event Event to be transferred
   * @throws PipeConnectionException if the connection is broken
   * @throws Exception the user can throw errors if necessary
   */
  void transfer(Event event) throws Exception;
}Custom Stream Processing Plugin Management
To ensure the flexibility and usability of user-defined plugins in production environments, the system needs to provide the capability to dynamically manage plugins. This section introduces the management statements for stream processing plugins, which enable the dynamic and unified management of plugins.
Load Plugin Statement
In IoTDB, to dynamically load a user-defined plugin into the system, you first need to implement a specific plugin class based on PipeSource, PipeProcessor, or PipeSink. Then, you need to compile and package the plugin class into an executable jar file. Finally, you can use the loading plugin management statement to load the plugin into IoTDB.
The syntax of the loading plugin management statement is as follows:
CREATE PIPEPLUGIN [IF NOT EXISTS] <alias>
AS <Full class name>
USING <URL of jar file>IF NOT EXISTS semantics: Used in creation operations to ensure that the create command is executed when the specified Pipe Plugin does not exist, preventing errors caused by attempting to create an existing Pipe Plugin.
For example, if a user implements a data processing plugin with the fully qualified class name "edu.tsinghua.iotdb.pipe.ExampleProcessor" and packages it into a jar file, which is stored at "https://example.com:8080/iotdb/pipe-plugin.jar", and the user wants to use this plugin in the stream processing engine, marking the plugin as "example". The creation statement for this data processing plugin is as follows:
CREATE PIPEPLUGIN IF NOT EXISTS example
AS 'edu.tsinghua.iotdb.pipe.ExampleProcessor'
USING URI <https://example.com:8080/iotdb/pipe-plugin.jar>Delete Plugin Statement
When user no longer wants to use a plugin and needs to uninstall the plugin from the system, you can use the Remove plugin statement as shown below.
DROP PIPEPLUGIN [IF EXISTS] <alias>IF EXISTS semantics: Used in deletion operations to ensure that when a specified Pipe Plugin exists, the delete command is executed to prevent errors caused by attempting to delete a non-existent Pipe Plugin.
Show Plugin Statement
User can also view the plugin in the system on need. The statement to view plugin is as follows.
SHOW PIPEPLUGINSSystem Pre-installed Stream Processing Plugin
Pre-built Source Plugin
iotdb-source
Function: Extract historical or realtime data inside IoTDB into pipe.
| key | value | value range | required or optional with default | 
|---|---|---|---|
| source | iotdb-source | String: iotdb-source | required | 
| source.pattern | path prefix for filtering time series | String: any time series prefix | optional: root | 
| source.history.start-time | start of synchronizing historical data event time,including start-time | Long: [Long.MIN_VALUE, Long.MAX_VALUE] | optional: Long.MIN_VALUE | 
| source.history.end-time | end of synchronizing historical data event time,including end-time | Long: [Long.MIN_VALUE, Long.MAX_VALUE] | optional: Long.MAX_VALUE | 
| start-time(V1.3.1+) | start of synchronizing all data event time,including start-time. Will disable "history.start-time" "history.end-time" if configured | Long: [Long.MIN_VALUE, Long.MAX_VALUE] | optional: Long.MIN_VALUE | 
| end-time(V1.3.1+) | end of synchronizing all data event time,including end-time. Will disable "history.start-time" "history.end-time" if configured | Long: [Long.MIN_VALUE, Long.MAX_VALUE] | optional: Long.MAX_VALUE | 
🚫 source.pattern Parameter Description
Pattern should use backquotes to modify illegal characters or illegal path nodes, for example, if you want to filter root.`a@b` or root.`123`, you should set the pattern to root.`a@b` or root.`123`(Refer specifically to Timing of single and double quotes and backquotes)
In the underlying implementation, when pattern is detected as root (default value) or a database name, synchronization efficiency is higher, and any other format will reduce performance.
The path prefix does not need to form a complete path. For example, when creating a pipe with the parameter 'source.pattern'='root.aligned.1':
- root.aligned.1TS
- root.aligned.1TS.`1`
- root.aligned.100TS
the data will be synchronized;
- root.aligned.`123`
the data will not be synchronized.
❗️start-time, end-time parameter description of source
- start-time, end-time should be in ISO format, such as 2011-12-03T10:15:30 or 2011-12-03T10:15:30+01:00. However, version 1.3.1+ supports timeStamp format like 1706704494000.
✅ A piece of data from production to IoTDB contains two key concepts of time
- event time: the time when the data is actually produced (or the generation time assigned to the data by the data production system, which is a time item in the data point), also called the event time.
- arrival time: the time the data arrived in the IoTDB system.
The out-of-order data we often refer to refers to data whose event time is far behind the current system time (or the maximum event time that has been dropped) when the data arrives. On the other hand, whether it is out-of-order data or sequential data, as long as they arrive newly in the system, their arrival time will increase with the order in which the data arrives at IoTDB.
💎 the work of iotdb-source can be split into two stages
- Historical data extraction: All data with arrival time < current system time when creating the pipe is called historical data
- Realtime data extraction: All data with arrival time >= current system time when the pipe is created is called realtime data
The historical data transmission phase and the realtime data transmission phase are executed serially. Only when the historical data transmission phase is completed, the realtime data transmission phase is executed.**
Pre-built Processor Plugin
do-nothing-processor
Function: Do not do anything with the events passed in by the source.
| key | value | value range | required or optional with default | 
|---|---|---|---|
| processor | do-nothing-processor | String: do-nothing-processor | required | 
Pre-built Sink Plugin
do-nothing-sink
Function: Does not do anything with the events passed in by the processor.
| key | value | value range | required or optional with default | 
|---|---|---|---|
| sink | do-nothing-sink | String: do-nothing-sink | required | 
Stream Processing Task Management
Create Stream Processing Task
A stream processing task can be created using the CREATE PIPE statement, a sample SQL statement is shown below:
CREATE PIPE <PipeId> -- PipeId is the name that uniquely identifies the sync task
WITH SOURCE (
  -- Default IoTDB Data Extraction Plugin
  'source'                    = 'iotdb-source',
  -- Path prefix, only data that can match the path prefix will be extracted for subsequent processing and delivery
  'source.pattern'            = 'root.timecho',
  -- Whether to extract historical data
  'source.history.enable'     = 'true',
  -- Describes the time range of the historical data being extracted, indicating the earliest possible time
  'source.history.start-time' = '2011.12.03T10:15:30+01:00',
  -- Describes the time range of the extracted historical data, indicating the latest time
  'source.history.end-time'   = '2022.12.03T10:15:30+01:00',
  -- Whether to extract realtime data
  'source.realtime.enable'    = 'true',
)
WITH PROCESSOR (
  -- Default data processing plugin, means no processing
  'processor'                    = 'do-nothing-processor',
)
WITH SINK (
  -- IoTDB data sending plugin with target IoTDB
  'sink'                    = 'iotdb-thrift-sink',
  -- Data service for one of the DataNode nodes on the target IoTDB ip
  'sink.ip'                 = '127.0.0.1',
  -- Data service port of one of the DataNode nodes of the target IoTDB
  'sink.port'               = '6667',
)To create a stream processing task it is necessary to configure the PipeId and the parameters of the three plugin sections:
| configuration item | description | Required or not | default implementation | Default implementation description | Whether to allow custom implementations | 
|---|---|---|---|---|---|
| pipeId | Globally uniquely identifies the name of a sync task | - | - | - | |
| source | pipe Source plugin, for extracting synchronized data at the bottom of the database | Optional | iotdb-source | Integrate all historical data of the database and subsequent realtime data into the sync task | no | 
| processor | Pipe Processor plugin, for processing data | Optional | do-nothing-processor | no processing of incoming data | |
| sink | Pipe Sink plugin,for sending data | - | - | 
In the example, the iotdb-source, do-nothing-processor, and iotdb-thrift-sink plugins are used to build the data synchronisation task. iotdb has other built-in data synchronisation plugins, **see the section "System pre-built data synchronisation plugins" . See the "System Pre-installed Stream Processing Plugin" section.
An example of a minimalist CREATE PIPE statement is as follows:
CREATE PIPE <PipeId> -- PipeId is a name that uniquely identifies the task.
WITH SINK (
  -- IoTDB data sending plugin with target IoTDB
  'sink'      = 'iotdb-thrift-sink',
  -- Data service for one of the DataNode nodes on the target IoTDB ip
  'sink.ip'   = '127.0.0.1',
  -- Data service port of one of the DataNode nodes of the target IoTDB
  'sink.port' = '6667',
)The expressed semantics are: synchronise the full amount of historical data and subsequent arrivals of realtime data from this database instance to the IoTDB instance with target 127.0.0.1:6667.
Note:
- SOURCE and PROCESSOR are optional, if no configuration parameters are filled in, the system will use the corresponding default implementation. 
- The SINK is a mandatory configuration that needs to be declared in the CREATE PIPE statement for configuring purposes. 
- The SINK exhibits self-reusability. For different tasks, if their SINK possesses identical KV properties (where the value corresponds to every key), the system will ultimately create only one instance of the SINK to achieve resource reuse for connections. - For example, there are the following pipe1, pipe2 task declarations:
 - CREATE PIPE pipe1 WITH SINK ( 'sink' = 'iotdb-thrift-sink', 'sink.thrift.host' = 'localhost', 'sink.thrift.port' = '9999', ) CREATE PIPE pipe2 WITH SINK ( 'sink' = 'iotdb-thrift-sink', 'sink.thrift.port' = '9999', 'sink.thrift.host' = 'localhost', )
- Since they have identical SINK declarations (even if the order of some properties is different), the framework will automatically reuse the SINK declared by them. Hence, the SINK instances for pipe1 and pipe2 will be the same. 
- Please note that we should avoid constructing application scenarios that involve data cycle sync (as it can result in an infinite loop): 
- IoTDB A -> IoTDB B -> IoTDB A 
- IoTDB A -> IoTDB A 
Start Stream Processing Task
After the successful execution of the CREATE PIPE statement, task-related instances will be created. However, the overall task's running status will be set to STOPPED(V1.3.0), meaning the task will not immediately process data. In version 1.3.1 and later, the status of the task will be set to RUNNING after CREATE.
You can use the START PIPE statement to make the stream processing task start processing data:
START PIPE <PipeId>Stop Stream Processing Task
Use the STOP PIPE statement to stop the stream processing task from processing data:
STOP PIPE <PipeId>Delete Stream Processing Task
If a stream processing task is in the RUNNING state, you can use the DROP PIPE statement to stop it and delete the entire task:
DROP PIPE <PipeId>Before deleting a stream processing task, there is no need to execute the STOP operation.
Show Stream Processing Task
Use the SHOW PIPES statement to view all stream processing tasks:
SHOW PIPESThe query results are as follows:
+-----------+-----------------------+-------+----------+-------------+--------+----------------+
|         ID|          CreationTime |  State|PipeSource|PipeProcessor|PipeSink|ExceptionMessage|
+-----------+-----------------------+-------+----------+-------------+--------+----------------+
|iotdb-kafka|2022-03-30T20:58:30.689|RUNNING|       ...|          ...|     ...|              {}|
+-----------+-----------------------+-------+----------+-------------+--------+----------------+
|iotdb-iotdb|2022-03-31T12:55:28.129|STOPPED|       ...|          ...|     ...| TException: ...|
+-----------+-----------------------+-------+----------+-------------+--------+----------------+You can use <PipeId> to specify the status of a stream processing task you want to see:
SHOW PIPE <PipeId>Additionally, the WHERE clause can be used to determine if the Pipe Sink used by a specific <PipeId> is being reused.
SHOW PIPES
WHERE SINK USED BY <PipeId>Stream Processing Task Running Status Migration
A stream processing task status can transition through several states during the lifecycle of a data synchronization pipe:
- RUNNING: The pipe is actively processing data - After the successful creation of a pipe, its initial state is set to RUNNING (V1.3.1+)
 
- STOPPED: The pipe is in a stopped state. It can have the following possibilities: - After the successful creation of a pipe, its initial state is set to RUNNING (V1.3.0)
- The user manually pauses a pipe that is in normal running state, transitioning its status from RUNNING to STOPPED
- If a pipe encounters an unrecoverable error during execution, its status automatically changes from RUNNING to STOPPED.
 
- DROPPED: The pipe is permanently deleted
The following diagram illustrates the different states and their transitions:

Authority Management
Stream Processing Task
| Authority Name | Description | 
|---|---|
| USE_PIPE | Register task,path-independent | 
| USE_PIPE | Start task,path-independent | 
| USE_PIPE | Stop task,path-independent | 
| USE_PIPE | Uninstall task,path-independent | 
| USE_PIPE | Query task,path-independent | 
Stream Processing Task Plugin
| Authority Name | Description | 
|---|---|
| USE_PIPE | Register stream processing task plugin,path-independent | 
| USE_PIPE | Delete stream processing task plugin,path-independent | 
| USE_PIPE | Query stream processing task plugin,path-independent | 
Configure Parameters
In iotdb-system.properties :
V1.3.0:
####################
### Pipe Configuration
####################
# Uncomment the following field to configure the pipe lib directory.
# For Windows platform
# If its prefix is a drive specifier followed by "\\", or if its prefix is "\\\\", then the path is
# absolute. Otherwise, it is relative.
# pipe_lib_dir=ext\\pipe
# For Linux platform
# If its prefix is "/", then the path is absolute. Otherwise, it is relative.
# pipe_lib_dir=ext/pipe
# The maximum number of threads that can be used to execute the pipe subtasks in PipeSubtaskExecutor.
# The actual value will be min(pipe_subtask_executor_max_thread_num, max(1, CPU core number / 2)).
# pipe_subtask_executor_max_thread_num=5
# The connection timeout (in milliseconds) for the thrift client.
# pipe_connector_timeout_ms=900000
# The maximum number of selectors that can be used in the async connector.
# pipe_async_connector_selector_number=1
# The core number of clients that can be used in the async connector.
# pipe_async_connector_core_client_number=8
# The maximum number of clients that can be used in the async connector.
# pipe_async_connector_max_client_number=16V1.3.1+:
####################
### Pipe Configuration
####################
# Uncomment the following field to configure the pipe lib directory.
# For Windows platform
# If its prefix is a drive specifier followed by "\\", or if its prefix is "\\\\", then the path is
# absolute. Otherwise, it is relative.
# pipe_lib_dir=ext\\pipe
# For Linux platform
# If its prefix is "/", then the path is absolute. Otherwise, it is relative.
# pipe_lib_dir=ext/pipe
# The maximum number of threads that can be used to execute the pipe subtasks in PipeSubtaskExecutor.
# The actual value will be min(pipe_subtask_executor_max_thread_num, max(1, CPU core number / 2)).
# pipe_subtask_executor_max_thread_num=5
# The connection timeout (in milliseconds) for the thrift client.
# pipe_sink_timeout_ms=900000
# The maximum number of selectors that can be used in the sink.
# Recommend to set this value to less than or equal to pipe_sink_max_client_number.
# pipe_sink_selector_number=4
# The maximum number of clients that can be used in the sink.
# pipe_sink_max_client_number=16