Skip to main content

IoTDB Stream Processing Framework

...About 17 min

IoTDB Stream Processing Framework

The IoTDB stream processing framework allows users to implement customized stream processing logic, which can monitor and capture storage engine changes, transform changed data, and push transformed data outward.

We call . A stream processing task (Pipe) contains three subtasks:

  • Extract
  • Process
  • Send (Connect)

The stream processing framework allows users to customize the processing logic of three subtasks using Java language and process data in a UDF-like manner.
In a Pipe, the three subtasks mentioned above are executed and implemented by three types of plugins. Data flows through these three plugins sequentially for processing:
Pipe Extractor is used to extract data, Pipe Processor is used to process data, Pipe Connector is used to send data, and the final data will be sent to an external system.

The model for a Pipe task is as follows:

pipe.png
A data stream processing task essentially describes the attributes of the Pipe Extractor, Pipe Processor, and Pipe Connector plugins.

Users can configure the specific attributes of these three subtasks declaratively using SQL statements. By combining different attributes, flexible data ETL (Extract, Transform, Load) capabilities can be achieved.

Using the stream processing framework, it is possible to build a complete data pipeline to fulfill various requirements such as edge-to-cloud synchronization, remote disaster recovery, and read/write load balancing across multiple databases.

Custom Stream Processing Plugin Development

Programming development dependencies

It is recommended to use Maven to build the project. Add the following dependencies in the pom.xml file. Please make sure to choose dependencies with the same version as the IoTDB server version.

<dependency>
    <groupId>org.apache.iotdb</groupId>
    <artifactId>pipe-api</artifactId>
    <version>1.2.1</version>
    <scope>provided</scope>
</dependency>

Event-Driven Programming Model

The design of user programming interfaces for stream processing plugins follows the principles of the event-driven programming model. In this model, events serve as the abstraction of data in the user programming interface. The programming interface is decoupled from the specific execution method, allowing the focus to be on describing how the system expects events (data) to be processed upon arrival.

In the user programming interface of stream processing plugins, events abstract the write operations of database data. Events are captured by the local stream processing engine and passed sequentially through the three stages of stream processing, namely Pipe Extractor, Pipe Processor, and Pipe Connector plugins. User logic is triggered and executed within these three plugins.

To accommodate both low-latency stream processing in low-load scenarios and high-throughput stream processing in high-load scenarios at the edge, the stream processing engine dynamically chooses the processing objects from operation logs and data files. Therefore, the user programming interface for stream processing requires the user to provide the handling logic for two types of events: TabletInsertionEvent for operation log write events and TsFileInsertionEvent for data file write events.

TabletInsertionEvent

The TabletInsertionEvent is a high-level data abstraction for user write requests, which provides the ability to manipulate the underlying data of the write request by providing a unified operation interface.

For different database deployments, the underlying storage structure corresponding to the operation log write event is different. For stand-alone deployment scenarios, the operation log write event is an encapsulation of write-ahead log (WAL) entries; for distributed deployment scenarios, the operation log write event is an encapsulation of individual node consensus protocol operation log entries.

For write operations generated by different write request interfaces of the database, the data structure of the request structure corresponding to the operation log write event is also different.IoTDB provides many write interfaces such as InsertRecord, InsertRecords, InsertTablet, InsertTablets, and so on, and each kind of write request uses a completely different serialisation method to generate a write request. completely different serialisation methods and generate different binary entries.

The existence of operation log write events provides users with a unified view of data operations, which shields the implementation differences of the underlying data structures, greatly reduces the programming threshold for users, and improves the ease of use of the functionality.

/** TabletInsertionEvent is used to define the event of data insertion. */
public interface TabletInsertionEvent extends Event {

  /**
   * The consumer processes the data row by row and collects the results by RowCollector.
   *
   * @return {@code Iterable<TabletInsertionEvent>} a list of new TabletInsertionEvent contains the
   *     results collected by the RowCollector
   */
  Iterable<TabletInsertionEvent> processRowByRow(BiConsumer<Row, RowCollector> consumer);

  /**
   * The consumer processes the Tablet directly and collects the results by RowCollector.
   *
   * @return {@code Iterable<TabletInsertionEvent>} a list of new TabletInsertionEvent contains the
   *     results collected by the RowCollector
   */
  Iterable<TabletInsertionEvent> processTablet(BiConsumer<Tablet, RowCollector> consumer);
}

TsFileInsertionEvent

The TsFileInsertionEvent represents a high-level abstraction of the database's disk flush operation and is a collection of multiple TabletInsertionEvents.

IoTDB's storage engine is based on the LSM (Log-Structured Merge) structure. When data is written, the write operations are first flushed to log-structured files, while the written data is also stored in memory. When the memory reaches its capacity limit, a flush operation is triggered, converting the data in memory into a database file while deleting the previously written log entries. During the conversion from memory data to database file data, two compression processes, encoding compression and universal compression, are applied. As a result, the data in the database file occupies less space compared to the original data in memory.

In extreme network conditions, directly transferring data files is more cost-effective than transmitting individual write operations. It consumes lower network bandwidth and achieves faster transmission speed. However, there is no such thing as a free lunch. Performing calculations on data in the disk file incurs additional costs for file I/O compared to performing calculations directly on data in memory. Nevertheless, the coexistence of disk data files and memory write operations permits dynamic trade-offs and adjustments. It is based on this observation that the data file write event is introduced into the event model of the plugin.

In summary, the data file write event appears in the event stream of stream processing plugins in the following two scenarios:

  1. Historical data extraction: Before a stream processing task starts, all persisted write data exists in the form of TsFiles. When collecting historical data at the beginning of a stream processing task, the historical data is abstracted as TsFileInsertionEvent.

  2. Real-time data extraction: During the execution of a stream processing task, if the speed of processing the log entries representing real-time operations is slower than the rate of write requests, the unprocessed log entries will be persisted to disk in the form of TsFiles. When these data are extracted by the stream processing engine, they are abstracted as TsFileInsertionEvent.

/**
 * TsFileInsertionEvent is used to define the event of writing TsFile. Event data stores in disks,
 * which is compressed and encoded, and requires IO cost for computational processing.
 */
public interface TsFileInsertionEvent extends Event {

  /**
   * The method is used to convert the TsFileInsertionEvent into several TabletInsertionEvents.
   *
   * @return {@code Iterable<TabletInsertionEvent>} the list of TabletInsertionEvent
   */
  Iterable<TabletInsertionEvent> toTabletInsertionEvents();
}

Custom Stream Processing Plugin Programming Interface Definition

Based on the custom stream processing plugin programming interface, users can easily write data extraction plugins, data processing plugins, and data sending plugins, allowing the stream processing functionality to adapt flexibly to various industrial scenarios.

Data Extraction Plugin Interface

Data extraction is the first stage of the three-stage process of stream processing, which includes data extraction, data processing, and data sending. The data extraction plugin (PipeExtractor) serves as a bridge between the stream processing engine and the storage engine. It captures various data write events by listening to the behavior of the storage engine.

/**
 * PipeExtractor
 *
 * <p>PipeExtractor is responsible for capturing events from sources.
 *
 * <p>Various data sources can be supported by implementing different PipeExtractor classes.
 *
 * <p>The lifecycle of a PipeExtractor is as follows:
 *
 * <ul>
 *   <li>When a collaboration task is created, the KV pairs of `WITH EXTRACTOR` clause in SQL are
 *       parsed and the validation method {@link PipeExtractor#validate(PipeParameterValidator)}
 *       will be called to validate the parameters.
 *   <li>Before the collaboration task starts, the method {@link
 *       PipeExtractor#customize(PipeParameters, PipeExtractorRuntimeConfiguration)} will be called
 *       to config the runtime behavior of the PipeExtractor.
 *   <li>Then the method {@link PipeExtractor#start()} will be called to start the PipeExtractor.
 *   <li>While the collaboration task is in progress, the method {@link PipeExtractor#supply()} will
 *       be called to capture events from sources and then the events will be passed to the
 *       PipeProcessor.
 *   <li>The method {@link PipeExtractor#close()} will be called when the collaboration task is
 *       cancelled (the `DROP PIPE` command is executed).
 * </ul>
 */
public interface PipeExtractor extends PipePlugin {

  /**
   * This method is mainly used to validate {@link PipeParameters} and it is executed before {@link
   * PipeExtractor#customize(PipeParameters, PipeExtractorRuntimeConfiguration)} is called.
   *
   * @param validator the validator used to validate {@link PipeParameters}
   * @throws Exception if any parameter is not valid
   */
  void validate(PipeParameterValidator validator) throws Exception;

  /**
   * This method is mainly used to customize PipeExtractor. In this method, the user can do the
   * following things:
   *
   * <ul>
   *   <li>Use PipeParameters to parse key-value pair attributes entered by the user.
   *   <li>Set the running configurations in PipeExtractorRuntimeConfiguration.
   * </ul>
   *
   * <p>This method is called after the method {@link
   * PipeExtractor#validate(PipeParameterValidator)} is called.
   *
   * @param parameters used to parse the input parameters entered by the user
   * @param configuration used to set the required properties of the running PipeExtractor
   * @throws Exception the user can throw errors if necessary
   */
  void customize(PipeParameters parameters, PipeExtractorRuntimeConfiguration configuration)
          throws Exception;

  /**
   * Start the extractor. After this method is called, events should be ready to be supplied by
   * {@link PipeExtractor#supply()}. This method is called after {@link
   * PipeExtractor#customize(PipeParameters, PipeExtractorRuntimeConfiguration)} is called.
   *
   * @throws Exception the user can throw errors if necessary
   */
  void start() throws Exception;

  /**
   * Supply single event from the extractor and the caller will send the event to the processor.
   * This method is called after {@link PipeExtractor#start()} is called.
   *
   * @return the event to be supplied. the event may be null if the extractor has no more events at
   *     the moment, but the extractor is still running for more events.
   * @throws Exception the user can throw errors if necessary
   */
  Event supply() throws Exception;
}

Data Processing Plugin Interface

Data processing is the second stage of the three-stage process of stream processing, which includes data extraction, data processing, and data sending. The data processing plugin (PipeProcessor) is primarily used for filtering and transforming the various events captured by the data extraction plugin (PipeExtractor).

/**
 * PipeProcessor
 *
 * <p>PipeProcessor is used to filter and transform the Event formed by the PipeExtractor.
 *
 * <p>The lifecycle of a PipeProcessor is as follows:
 *
 * <ul>
 *   <li>When a collaboration task is created, the KV pairs of `WITH PROCESSOR` clause in SQL are
 *       parsed and the validation method {@link PipeProcessor#validate(PipeParameterValidator)}
 *       will be called to validate the parameters.
 *   <li>Before the collaboration task starts, the method {@link
 *       PipeProcessor#customize(PipeParameters, PipeProcessorRuntimeConfiguration)} will be called
 *       to config the runtime behavior of the PipeProcessor.
 *   <li>While the collaboration task is in progress:
 *       <ul>
 *         <li>PipeExtractor captures the events and wraps them into three types of Event instances.
 *         <li>PipeProcessor processes the event and then passes them to the PipeConnector. The
 *             following 3 methods will be called: {@link
 *             PipeProcessor#process(TabletInsertionEvent, EventCollector)}, {@link
 *             PipeProcessor#process(TsFileInsertionEvent, EventCollector)} and {@link
 *             PipeProcessor#process(Event, EventCollector)}.
 *         <li>PipeConnector serializes the events into binaries and send them to sinks.
 *       </ul>
 *   <li>When the collaboration task is cancelled (the `DROP PIPE` command is executed), the {@link
 *       PipeProcessor#close() } method will be called.
 * </ul>
 */
public interface PipeProcessor extends PipePlugin {

  /**
   * This method is mainly used to validate {@link PipeParameters} and it is executed before {@link
   * PipeProcessor#customize(PipeParameters, PipeProcessorRuntimeConfiguration)} is called.
   *
   * @param validator the validator used to validate {@link PipeParameters}
   * @throws Exception if any parameter is not valid
   */
  void validate(PipeParameterValidator validator) throws Exception;

  /**
   * This method is mainly used to customize PipeProcessor. In this method, the user can do the
   * following things:
   *
   * <ul>
   *   <li>Use PipeParameters to parse key-value pair attributes entered by the user.
   *   <li>Set the running configurations in PipeProcessorRuntimeConfiguration.
   * </ul>
   *
   * <p>This method is called after the method {@link
   * PipeProcessor#validate(PipeParameterValidator)} is called and before the beginning of the
   * events processing.
   *
   * @param parameters used to parse the input parameters entered by the user
   * @param configuration used to set the required properties of the running PipeProcessor
   * @throws Exception the user can throw errors if necessary
   */
  void customize(PipeParameters parameters, PipeProcessorRuntimeConfiguration configuration)
          throws Exception;

  /**
   * This method is called to process the TabletInsertionEvent.
   *
   * @param tabletInsertionEvent TabletInsertionEvent to be processed
   * @param eventCollector used to collect result events after processing
   * @throws Exception the user can throw errors if necessary
   */
  void process(TabletInsertionEvent tabletInsertionEvent, EventCollector eventCollector)
          throws Exception;

  /**
   * This method is called to process the TsFileInsertionEvent.
   *
   * @param tsFileInsertionEvent TsFileInsertionEvent to be processed
   * @param eventCollector used to collect result events after processing
   * @throws Exception the user can throw errors if necessary
   */
  default void process(TsFileInsertionEvent tsFileInsertionEvent, EventCollector eventCollector)
          throws Exception {
    for (final TabletInsertionEvent tabletInsertionEvent :
            tsFileInsertionEvent.toTabletInsertionEvents()) {
      process(tabletInsertionEvent, eventCollector);
    }
  }

  /**
   * This method is called to process the Event.
   *
   * @param event Event to be processed
   * @param eventCollector used to collect result events after processing
   * @throws Exception the user can throw errors if necessary
   */
  void process(Event event, EventCollector eventCollector) throws Exception;
}

Data Sending Plugin Interface

Data sending is the third stage of the three-stage process of stream processing, which includes data extraction, data processing, and data sending. The data sending plugin (PipeConnector) is responsible for sending the various events processed by the data processing plugin (PipeProcessor). It serves as the network implementation layer of the stream processing framework and should support multiple real-time communication protocols and connectors in its interface.

/**
 * PipeConnector
 *
 * <p>PipeConnector is responsible for sending events to sinks.
 *
 * <p>Various network protocols can be supported by implementing different PipeConnector classes.
 *
 * <p>The lifecycle of a PipeConnector is as follows:
 *
 * <ul>
 *   <li>When a collaboration task is created, the KV pairs of `WITH CONNECTOR` clause in SQL are
 *       parsed and the validation method {@link PipeConnector#validate(PipeParameterValidator)}
 *       will be called to validate the parameters.
 *   <li>Before the collaboration task starts, the method {@link
 *       PipeConnector#customize(PipeParameters, PipeConnectorRuntimeConfiguration)} will be called
 *       to config the runtime behavior of the PipeConnector and the method {@link
 *       PipeConnector#handshake()} will be called to create a connection with sink.
 *   <li>While the collaboration task is in progress:
 *       <ul>
 *         <li>PipeExtractor captures the events and wraps them into three types of Event instances.
 *         <li>PipeProcessor processes the event and then passes them to the PipeConnector.
 *         <li>PipeConnector serializes the events into binaries and send them to sinks. The
 *             following 3 methods will be called: {@link
 *             PipeConnector#transfer(TabletInsertionEvent)}, {@link
 *             PipeConnector#transfer(TsFileInsertionEvent)} and {@link
 *             PipeConnector#transfer(Event)}.
 *       </ul>
 *   <li>When the collaboration task is cancelled (the `DROP PIPE` command is executed), the {@link
 *       PipeConnector#close() } method will be called.
 * </ul>
 *
 * <p>In addition, the method {@link PipeConnector#heartbeat()} will be called periodically to check
 * whether the connection with sink is still alive. The method {@link PipeConnector#handshake()}
 * will be called to create a new connection with the sink when the method {@link
 * PipeConnector#heartbeat()} throws exceptions.
 */
public interface PipeConnector extends PipePlugin {

  /**
   * This method is mainly used to validate {@link PipeParameters} and it is executed before {@link
   * PipeConnector#customize(PipeParameters, PipeConnectorRuntimeConfiguration)} is called.
   *
   * @param validator the validator used to validate {@link PipeParameters}
   * @throws Exception if any parameter is not valid
   */
  void validate(PipeParameterValidator validator) throws Exception;

  /**
   * This method is mainly used to customize PipeConnector. In this method, the user can do the
   * following things:
   *
   * <ul>
   *   <li>Use PipeParameters to parse key-value pair attributes entered by the user.
   *   <li>Set the running configurations in PipeConnectorRuntimeConfiguration.
   * </ul>
   *
   * <p>This method is called after the method {@link
   * PipeConnector#validate(PipeParameterValidator)} is called and before the method {@link
   * PipeConnector#handshake()} is called.
   *
   * @param parameters used to parse the input parameters entered by the user
   * @param configuration used to set the required properties of the running PipeConnector
   * @throws Exception the user can throw errors if necessary
   */
  void customize(PipeParameters parameters, PipeConnectorRuntimeConfiguration configuration)
          throws Exception;

  /**
   * This method is used to create a connection with sink. This method will be called after the
   * method {@link PipeConnector#customize(PipeParameters, PipeConnectorRuntimeConfiguration)} is
   * called or will be called when the method {@link PipeConnector#heartbeat()} throws exceptions.
   *
   * @throws Exception if the connection is failed to be created
   */
  void handshake() throws Exception;

  /**
   * This method will be called periodically to check whether the connection with sink is still
   * alive.
   *
   * @throws Exception if the connection dies
   */
  void heartbeat() throws Exception;

  /**
   * This method is used to transfer the TabletInsertionEvent.
   *
   * @param tabletInsertionEvent TabletInsertionEvent to be transferred
   * @throws PipeConnectionException if the connection is broken
   * @throws Exception the user can throw errors if necessary
   */
  void transfer(TabletInsertionEvent tabletInsertionEvent) throws Exception;

  /**
   * This method is used to transfer the TsFileInsertionEvent.
   *
   * @param tsFileInsertionEvent TsFileInsertionEvent to be transferred
   * @throws PipeConnectionException if the connection is broken
   * @throws Exception the user can throw errors if necessary
   */
  default void transfer(TsFileInsertionEvent tsFileInsertionEvent) throws Exception {
    for (final TabletInsertionEvent tabletInsertionEvent :
            tsFileInsertionEvent.toTabletInsertionEvents()) {
      transfer(tabletInsertionEvent);
    }
  }

  /**
   * This method is used to transfer the Event.
   *
   * @param event Event to be transferred
   * @throws PipeConnectionException if the connection is broken
   * @throws Exception the user can throw errors if necessary
   */
  void transfer(Event event) throws Exception;
}

Custom Stream Processing Plugin Management

To ensure the flexibility and usability of user-defined plugins in production environments, the system needs to provide the capability to dynamically manage plugins. This section introduces the management statements for stream processing plugins, which enable the dynamic and unified management of plugins.

Load Plugin Statement

In IoTDB, to dynamically load a user-defined plugin into the system, you first need to implement a specific plugin class based on PipeExtractor, PipeProcessor, or PipeConnector. Then, you need to compile and package the plugin class into an executable jar file. Finally, you can use the loading plugin management statement to load the plugin into IoTDB.

The syntax of the loading plugin management statement is as follows:

CREATE PIPEPLUGIN <alias>
AS <Full class name>
USING <URL of jar file>

For example, if a user implements a data processing plugin with the fully qualified class name "edu.tsinghua.iotdb.pipe.ExampleProcessor" and packages it into a jar file, which is stored at "https://example.com:8080/iotdb/pipe-plugin.jaropen in new window", and the user wants to use this plugin in the stream processing engine, marking the plugin as "example". The creation statement for this data processing plugin is as follows:

CREATE PIPEPLUGIN example
AS 'edu.tsinghua.iotdb.pipe.ExampleProcessor'
USING URI '<https://example.com:8080/iotdb/pipe-plugin.jar>'

Delete Plugin Statement

When user no longer wants to use a plugin and needs to uninstall the plugin from the system, you can use the Remove plugin statement as shown below.

DROP PIPEPLUGIN <alias>

Show Plugin Statement

User can also view the plugin in the system on need. The statement to view plugin is as follows.

SHOW PIPEPLUGINS

System Pre-installed Stream Processing Plugin

Pre-built extractor Plugin

iotdb-extractor

Function: Extract historical or realtime data inside IoTDB into pipe.

keyvaluevalue rangerequired or optional with default
extractoriotdb-extractorString: iotdb-extractorrequired
extractor.patternpath prefix for filtering time seriesString: any time series prefixoptional: root
extractor.history.enablewhether to sync historical dataBoolean: true, falseoptional: true
extractor.history.start-timestart of synchronizing historical data event time,Include start-timeLong: [Long.MIN_VALUE, Long.MAX_VALUE]optional: Long.MIN_VALUE
extractor.history.end-timeend of synchronizing historical data event time,Include end-timeLong: [Long.MIN_VALUE, Long.MAX_VALUE]optional: Long.MAX_VALUE
extractor.realtime.enableWhether to sync realtime dataBoolean: true, falseoptional: true

🚫 extractor.pattern Parameter Description

  • Pattern should use backquotes to modify illegal characters or illegal path nodes, for example, if you want to filter root.`a@b` or root.`123`, you should set the pattern to root.`a@b` or root.`123`(Refer specifically to Timing of single and double quotes and backquotesopen in new window

  • In the underlying implementation, when pattern is detected as root (default value), synchronization efficiency is higher, and any other format will reduce performance.

  • The path prefix does not need to form a complete path. For example, when creating a pipe with the parameter 'extractor.pattern'='root.aligned.1':

    • root.aligned.1TS
    • root.aligned.1TS.`1`
    • root.aligned.100TS

    the data will be synchronized;

    • root.aligned.`1`
    • root.aligned.`123`

    the data will not be synchronized.

❗️start-time, end-time parameter description of extractor.history

  • start-time, end-time should be in ISO format, such as 2011-12-03T10:15:30 or 2011-12-03T10:15:30+01:00

a piece of data from production to IoTDB contains two key concepts of time

  • event time: the time when the data is actually produced (or the generation time assigned to the data by the data production system, which is a time item in the data point), also called the event time.
  • arrival time: the time the data arrived in the IoTDB system.

The out-of-order data we often refer to refers to data whose event time is far behind the current system time (or the maximum event time that has been dropped) when the data arrives. On the other hand, whether it is out-of-order data or sequential data, as long as they arrive newly in the system, their arrival time will increase with the order in which the data arrives at IoTDB.

💎 the work of iotdb-extractor can be split into two stages

  1. Historical data extraction: All data with arrival time < current system time when creating the pipe is called historical data
  2. Realtime data extraction: All data with arrival time >= current system time when the pipe is created is called realtime data

The historical data transmission phase and the realtime data transmission phase are executed serially. Only when the historical data transmission phase is completed, the realtime data transmission phase is executed.**

Users can specify iotdb-extractor to:

  • Historical data extraction('extractor.history.enable' = 'true', 'extractor.realtime.enable' = 'false'
  • Realtime data extraction('extractor.history.enable' = 'false', 'extractor.realtime.enable' = 'true'
  • Full data extraction('extractor.history.enable' = 'true', 'extractor.realtime.enable' = 'true'
  • Disable simultaneous sets extractor.history.enable and extractor.realtime.enable to false

Pre-built Processor Plugin

do-nothing-processor

Function: Do not do anything with the events passed in by the extractor.

keyvaluevalue rangerequired or optional with default
processordo-nothing-processorString: do-nothing-processorrequired

Pre-built Connector Plugin

do-nothing-connector

Function: Does not do anything with the events passed in by the processor.

keyvaluevalue rangerequired or optional with default
connectordo-nothing-connectorString: do-nothing-connectorrequired

Stream Processing Task Management

Create Stream Processing Task

A stream processing task can be created using the CREATE PIPE statement, a sample SQL statement is shown below:

CREATE PIPE <PipeId> -- PipeId is the name that uniquely identifies the sync task
WITH EXTRACTOR (
  -- Default IoTDB Data Extraction Plugin
  'extractor'                    = 'iotdb-extractor',
  -- Path prefix, only data that can match the path prefix will be extracted for subsequent processing and delivery
  'extractor.pattern'            = 'root.timecho',
  -- Whether to extract historical data
  'extractor.history.enable'     = 'true',
  -- Describes the time range of the historical data being extracted, indicating the earliest possible time
  'extractor.history.start-time' = '2011.12.03T10:15:30+01:00',
  -- Describes the time range of the extracted historical data, indicating the latest time
  'extractor.history.end-time'   = '2022.12.03T10:15:30+01:00',
  -- Whether to extract realtime data
  'extractor.realtime.enable'    = 'true',
)
WITH PROCESSOR (
  -- Default data processing plugin, means no processing
  'processor'                    = 'do-nothing-processor',
)
WITH CONNECTOR (
  -- IoTDB data sending plugin with target IoTDB
  'connector'                    = 'iotdb-thrift-connector',
  -- Data service for one of the DataNode nodes on the target IoTDB ip
  'connector.ip'                 = '127.0.0.1',
  -- Data service port of one of the DataNode nodes of the target IoTDB
  'connector.port'               = '6667',
)

To create a stream processing task it is necessary to configure the PipeId and the parameters of the three plugin sections:

configuration itemdescriptionRequired or notdefault implementationDefault implementation descriptionWhether to allow custom implementations
pipeIdGlobally uniquely identifies the name of a sync task---
extractorpipe Extractor plugin, for extracting synchronized data at the bottom of the databaseOptionaliotdb-extractorIntegrate all historical data of the database and subsequent realtime data into the sync taskno
processorPipe Processor plugin, for processing dataOptionaldo-nothing-processorno processing of incoming data
connectorPipe Connector plugin,for sending data--

In the example, the iotdb-extractor, do-nothing-processor, and iotdb-thrift-connector plugins are used to build the data synchronisation task. iotdb has other built-in data synchronisation plugins, **see the section "System pre-built data synchronisation plugins" . See the "System Pre-installed Stream Processing Plugin" section.

An example of a minimalist CREATE PIPE statement is as follows:

CREATE PIPE <PipeId> -- PipeId is a name that uniquely identifies the task.
WITH CONNECTOR (
  -- IoTDB data sending plugin with target IoTDB
  'connector'      = 'iotdb-thrift-connector',
  -- Data service for one of the DataNode nodes on the target IoTDB ip
  'connector.ip'   = '127.0.0.1',
  -- Data service port of one of the DataNode nodes of the target IoTDB
  'connector.port' = '6667',
)

The expressed semantics are: synchronise the full amount of historical data and subsequent arrivals of realtime data from this database instance to the IoTDB instance with target 127.0.0.1:6667.

Note:

  • EXTRACTOR and PROCESSOR are optional, if no configuration parameters are filled in, the system will use the corresponding default implementation.

  • The CONNECTOR is a mandatory configuration that needs to be declared in the CREATE PIPE statement for configuring purposes.

  • The CONNECTOR exhibits self-reusability. For different tasks, if their CONNECTOR possesses identical KV properties (where the value corresponds to every key), the system will ultimately create only one instance of the CONNECTOR to achieve resource reuse for connections.

    • For example, there are the following pipe1, pipe2 task declarations:
    CREATE PIPE pipe1
    WITH CONNECTOR (
      'connector' = 'iotdb-thrift-connector',
      'connector.thrift.host' = 'localhost',
      'connector.thrift.port' = '9999',
    )
    
    CREATE PIPE pipe2
    WITH CONNECTOR (
      'connector' = 'iotdb-thrift-connector',
      'connector.thrift.port' = '9999',
      'connector.thrift.host' = 'localhost',
    )
    
    • Since they have identical CONNECTOR declarations (even if the order of some properties is different), the framework will automatically reuse the CONNECTOR declared by them. Hence, the CONNECTOR instances for pipe1 and pipe2 will be the same.
  • Please note that we should avoid constructing application scenarios that involve data cycle sync (as it can result in an infinite loop):

    • IoTDB A -> IoTDB B -> IoTDB A
    • IoTDB A -> IoTDB A

Start Stream Processing Task

After the successful execution of the CREATE PIPE statement, an instance of the stream processing task is created, but the overall task's running status will be set to STOPPED, meaning the task will not immediately process data.

You can use the START PIPE statement to make the stream processing task start processing data:

START PIPE <PipeId>

Stop Stream Processing Task

Use the STOP PIPE statement to stop the stream processing task from processing data:

STOP PIPE <PipeId>

Delete Stream Processing Task

If a stream processing task is in the RUNNING state, you can use the DROP PIPE statement to stop it and delete the entire task:

DROP PIPE <PipeId>

Before deleting a stream processing task, there is no need to execute the STOP operation.

Show Stream Processing Task

Use the SHOW PIPES statement to view all stream processing tasks:

SHOW PIPES

The query results are as follows:

+-----------+-----------------------+-------+-------------+-------------+-------------+----------------+
|         ID|          CreationTime |  State|PipeExtractor|PipeProcessor|PipeConnector|ExceptionMessage|
+-----------+-----------------------+-------+-------------+-------------+-------------+----------------+
|iotdb-kafka|2022-03-30T20:58:30.689|RUNNING|          ...|          ...|          ...|            None|
+-----------+-----------------------+-------+-------------+-------------+-------------+----------------+
|iotdb-iotdb|2022-03-31T12:55:28.129|STOPPED|          ...|          ...|          ...| TException: ...|
+-----------+-----------------------+-------+-------------+-------------+-------------+----------------+

You can use <PipeId> to specify the status of a stream processing task you want to see:

SHOW PIPE <PipeId>

Additionally, the WHERE clause can be used to determine if the Pipe Connector used by a specific <PipeId> is being reused.

SHOW PIPES
WHERE CONNECTOR USED BY <PipeId>

Stream Processing Task Running Status Migration

A stream processing task status can transition through several states during the lifecycle of a data synchronization pipe:

  • STOPPED: The pipe is in a stopped state. It can have the following possibilities:
    • After the successful creation of a pipe, its initial state is set to stopped
    • The user manually pauses a pipe that is in normal running state, transitioning its status from RUNNING to STOPPED
    • If a pipe encounters an unrecoverable error during execution, its status automatically changes from RUNNING to STOPPED.
  • RUNNING: The pipe is actively processing data
  • DROPPED: The pipe is permanently deleted

The following diagram illustrates the different states and their transitions:

state migration diagram
state migration diagram

Authority Management

Stream Processing Task

Authority NameDescription
CREATE_PIPERegister task,path-independent
START_PIPEStart task,path-independent
STOP_PIPEStop task,path-independent
DROP_PIPEUninstall task,path-independent
SHOW_PIPESQuery task,path-independent

Stream Processing Task Plugin

Authority NameDescription
CREATE_PIPEPLUGINRegister stream processing task plugin,path-independent
DROP_PIPEPLUGINDelete stream processing task plugin,path-independent
SHOW_PIPEPLUGINSQuery stream processing task plugin,path-independent

Configure Parameters

In iotdb-common.properties :

####################
### Pipe Configuration
####################

# Uncomment the following field to configure the pipe lib directory.
# For Windows platform
# If its prefix is a drive specifier followed by "\\", or if its prefix is "\\\\", then the path is
# absolute. Otherwise, it is relative.
# pipe_lib_dir=ext\\pipe
# For Linux platform
# If its prefix is "/", then the path is absolute. Otherwise, it is relative.
# pipe_lib_dir=ext/pipe

# The maximum number of threads that can be used to execute the pipe subtasks in PipeSubtaskExecutor.
# The actual value will be min(pipe_subtask_executor_max_thread_num, max(1, CPU core number / 2)).
# pipe_subtask_executor_max_thread_num=5

# The connection timeout (in milliseconds) for the thrift client.
# pipe_connector_timeout_ms=900000

Copyright © 2024 The Apache Software Foundation.
Apache and the Apache feather logo are trademarks of The Apache Software Foundation

Have a question? Connect with us on QQ, WeChat, or Slack. Join the community now.