Skip to main content

IoTDB stream processing framework

About 19 min

IoTDB stream processing framework

The IoTDB stream processing framework allows users to implement customized stream processing logic, which can monitor and capture storage engine changes, transform changed data, and push transformed data outward.

We call . A stream processing task (Pipe) contains three subtasks:

  • Extract
  • Process
  • Send (Connect)

The stream processing framework allows users to customize the processing logic of three subtasks using Java language and process data in a UDF-like manner.
In a Pipe, the above three subtasks are executed by three plugins respectively, and the data will be processed by these three plugins in turn:
Pipe Extractor is used to extract data, Pipe Processor is used to process data, Pipe Connector is used to send data, and the final data will be sent to an external system.

The model of the Pipe task is as follows:

pipe.png
pipe.png

Describing a data flow processing task essentially describes the properties of Pipe Extractor, Pipe Processor and Pipe Connector plugins.
Users can declaratively configure the specific attributes of the three subtasks through SQL statements, and achieve flexible data ETL capabilities by combining different attributes.

Using the stream processing framework, a complete data link can be built to meet the needs of end-side-cloud synchronization, off-site disaster recovery, and read-write load sub-library*.

Custom stream processing plugin development

Programming development dependencies

It is recommended to use maven to build the project and add the following dependencies in pom.xml. Please be careful to select the same dependency version as the IoTDB server version.

<dependency>
    <groupId>org.apache.iotdb</groupId>
    <artifactId>pipe-api</artifactId>
    <version>1.2.1</version>
    <scope>provided</scope>
</dependency>

Event-driven programming model

The user programming interface design of the stream processing plugin refers to the general design concept of the event-driven programming model. Events are data abstractions in the user programming interface, and the programming interface is decoupled from the specific execution method. It only needs to focus on describing the processing method expected by the system after the event (data) reaches the system.

In the user programming interface of the stream processing plugin, events are an abstraction of database data writing operations. The event is captured by the stand-alone stream processing engine, and is passed to the PipeExtractor plugin, PipeProcessor plugin, and PipeConnector plugin in sequence according to the three-stage stream processing process, and triggers the execution of user logic in the three plugins in turn.

In order to take into account the low latency of stream processing in low load scenarios on the end side and the high throughput of stream processing in high load scenarios on the end side, the stream processing engine will dynamically select processing objects in the operation logs and data files. Therefore, user programming of stream processing The interface requires users to provide processing logic for the following two types of events: operation log writing event TabletInsertionEvent and data file writing event TsFileInsertionEvent.

Operation log writing event (TabletInsertionEvent)

The operation log write event (TabletInsertionEvent) is a high-level data abstraction for user write requests. It provides users with the ability to manipulate the underlying data of write requests by providing a unified operation interface.

For different database deployment methods, the underlying storage structures corresponding to operation log writing events are different. For stand-alone deployment scenarios, the operation log writing event is an encapsulation of write-ahead log (WAL) entries; for a distributed deployment scenario, the operation log writing event is an encapsulation of a single node consensus protocol operation log entry.

For write operations generated by different write request interfaces in the database, the data structure of the request structure corresponding to the operation log write event is also different. IoTDB provides numerous writing interfaces such as InsertRecord, InsertRecords, InsertTablet, InsertTablets, etc. Each writing request uses a completely different serialization method, and the generated binary entries are also different.

The existence of operation log writing events provides users with a unified view of data operations, which shields the implementation differences of the underlying data structure, greatly reduces the user's programming threshold, and improves the ease of use of the function.

/** TabletInsertionEvent is used to define the event of data insertion. */
public interface TabletInsertionEvent extends Event {

  /**
   * The consumer processes the data row by row and collects the results by RowCollector.
   *
   * @return {@code Iterable<TabletInsertionEvent>} a list of new TabletInsertionEvent contains the
   *     results collected by the RowCollector
   */
  Iterable<TabletInsertionEvent> processRowByRow(BiConsumer<Row, RowCollector> consumer);

  /**
   * The consumer processes the Tablet directly and collects the results by RowCollector.
   *
   * @return {@code Iterable<TabletInsertionEvent>} a list of new TabletInsertionEvent contains the
   *     results collected by the RowCollector
   */
  Iterable<TabletInsertionEvent> processTablet(BiConsumer<Tablet, RowCollector> consumer);
}

Data file writing event (TsFileInsertionEvent)

The data file writing event (TsFileInsertionEvent) is a high-level abstraction of the database file writing operation. It is a data collection of several operation log writing events (TabletInsertionEvent).

The storage engine of IoTDB is LSM structured. When data is written, the writing operation will first be placed into a log-structured file, and the written data will be stored in the memory at the same time. When the memory reaches the control upper limit, the disk flushing behavior will be triggered, that is, the data in the memory will be converted into a database file, and the previously prewritten operation log will be deleted. When the data in the memory is converted into the data in the database file, it will undergo two compression processes: encoding compression and general compression. Therefore, the data in the database file takes up less space than the original data in the memory.

In extreme network conditions, directly transmitting data files is more economical than transmitting data writing operations. It will occupy lower network bandwidth and achieve faster transmission speeds. Of course, there is no free lunch. Computing and processing data in files requires additional file I/O costs compared to directly computing and processing data in memory. However, it is precisely the existence of two structures, disk data files and memory write operations, with their own advantages and disadvantages, that gives the system the opportunity to make dynamic trade-offs and adjustments. It is based on this observation that data files are introduced into the plugin's event model. Write event.

To sum up, the data file writing event appears in the event stream of the stream processing plugin, and there are two situations:

(1) Historical data extraction: Before a stream processing task starts, all written data that has been placed on the disk will exist in the form of TsFile. After a stream processing task starts, when collecting historical data, the historical data will be abstracted using TsFileInsertionEvent;

(2) Real-time data extraction: When a stream processing task is in progress, when the real-time processing speed of operation log write events in the data stream is slower than the write request speed, after a certain progress, the operation log write events that cannot be processed in the future will be persisted. to disk and exists in the form of TsFile. After this data is extracted by the stream processing engine, TsFileInsertionEvent will be used as an abstraction.

/**
 * TsFileInsertionEvent is used to define the event of writing TsFile. Event data stores in disks,
 * which is compressed and encoded, and requires IO cost for computational processing.
 */
public interface TsFileInsertionEvent extends Event {

  /**
   * The method is used to convert the TsFileInsertionEvent into several TabletInsertionEvents.
   *
   * @return {@code Iterable<TabletInsertionEvent>} the list of TabletInsertionEvent
   */
  Iterable<TabletInsertionEvent> toTabletInsertionEvents();
}

Custom stream processing plugin programming interface definition

Based on the custom stream processing plugin programming interface, users can easily write data extraction plugins, data processing plugins and data sending plugins, so that the stream processing function can be flexibly adapted to various industrial scenarios.

Data extraction plugin interface

Data extraction is the first stage of the three stages of stream processing data from data extraction to data sending. The data extraction plugin (PipeExtractor) is the bridge between the stream processing engine and the storage engine. It monitors the behavior of the storage engine,
Capture various data write events.

/**
 * PipeExtractor
 *
 * <p>PipeExtractor is responsible for capturing events from sources.
 *
 * <p>Various data sources can be supported by implementing different PipeExtractor classes.
 *
 * <p>The lifecycle of a PipeExtractor is as follows:
 *
 * <ul>
 *   <li>When a collaboration task is created, the KV pairs of `WITH EXTRACTOR` clause in SQL are
 *       parsed and the validation method {@link PipeExtractor#validate(PipeParameterValidator)}
 *       will be called to validate the parameters.
 *   <li>Before the collaboration task starts, the method {@link
 *       PipeExtractor#customize(PipeParameters, PipeExtractorRuntimeConfiguration)} will be called
 *       to config the runtime behavior of the PipeExtractor.
 *   <li>Then the method {@link PipeExtractor#start()} will be called to start the PipeExtractor.
 *   <li>While the collaboration task is in progress, the method {@link PipeExtractor#supply()} will
 *       be called to capture events from sources and then the events will be passed to the
 *       PipeProcessor.
 *   <li>The method {@link PipeExtractor#close()} will be called when the collaboration task is
 *       cancelled (the `DROP PIPE` command is executed).
 * </ul>
 */
public interface PipeExtractor extends PipePlugin {

  /**
   * This method is mainly used to validate {@link PipeParameters} and it is executed before {@link
   * PipeExtractor#customize(PipeParameters, PipeExtractorRuntimeConfiguration)} is called.
   *
   * @param validator the validator used to validate {@link PipeParameters}
   * @throws Exception if any parameter is not valid
   */
  void validate(PipeParameterValidator validator) throws Exception;

  /**
   * This method is mainly used to customize PipeExtractor. In this method, the user can do the
   * following things:
   *
   * <ul>
   *   <li>Use PipeParameters to parse key-value pair attributes entered by the user.
   *   <li>Set the running configurations in PipeExtractorRuntimeConfiguration.
   * </ul>
   *
   * <p>This method is called after the method {@link
   * PipeExtractor#validate(PipeParameterValidator)} is called.
   *
   * @param parameters used to parse the input parameters entered by the user
   * @param configuration used to set the required properties of the running PipeExtractor
   * @throws Exception the user can throw errors if necessary
   */
  void customize(PipeParameters parameters, PipeExtractorRuntimeConfiguration configuration)
          throws Exception;

  /**
   * Start the extractor. After this method is called, events should be ready to be supplied by
   * {@link PipeExtractor#supply()}. This method is called after {@link
   * PipeExtractor#customize(PipeParameters, PipeExtractorRuntimeConfiguration)} is called.
   *
   * @throws Exception the user can throw errors if necessary
   */
  void start() throws Exception;

  /**
   * Supply single event from the extractor and the caller will send the event to the processor.
   * This method is called after {@link PipeExtractor#start()} is called.
   *
   * @return the event to be supplied. the event may be null if the extractor has no more events at
   *     the moment, but the extractor is still running for more events.
   * @throws Exception the user can throw errors if necessary
   */
  Event supply() throws Exception;
}

Data processing plugin interface

Data processing is the second stage of the three stages of stream processing data from data extraction to data sending. The data processing plugin (PipeProcessor) is mainly used to filter and transform the data captured by the data extraction plugin (PipeExtractor).
various events.

/**
 * PipeProcessor
 *
 * <p>PipeProcessor is used to filter and transform the Event formed by the PipeExtractor.
 *
 * <p>The lifecycle of a PipeProcessor is as follows:
 *
 * <ul>
 *   <li>When a collaboration task is created, the KV pairs of `WITH PROCESSOR` clause in SQL are
 *       parsed and the validation method {@link PipeProcessor#validate(PipeParameterValidator)}
 *       will be called to validate the parameters.
 *   <li>Before the collaboration task starts, the method {@link
 *       PipeProcessor#customize(PipeParameters, PipeProcessorRuntimeConfiguration)} will be called
 *       to config the runtime behavior of the PipeProcessor.
 *   <li>While the collaboration task is in progress:
 *       <ul>
 *         <li>PipeExtractor captures the events and wraps them into three types of Event instances.
 *         <li>PipeProcessor processes the event and then passes them to the PipeConnector. The
 *             following 3 methods will be called: {@link
 *             PipeProcessor#process(TabletInsertionEvent, EventCollector)}, {@link
 *             PipeProcessor#process(TsFileInsertionEvent, EventCollector)} and {@link
 *             PipeProcessor#process(Event, EventCollector)}.
 *         <li>PipeConnector serializes the events into binaries and send them to sinks.
 *       </ul>
 *   <li>When the collaboration task is cancelled (the `DROP PIPE` command is executed), the {@link
 *       PipeProcessor#close() } method will be called.
 * </ul>
 */
public interface PipeProcessor extends PipePlugin {

  /**
   * This method is mainly used to validate {@link PipeParameters} and it is executed before {@link
   * PipeProcessor#customize(PipeParameters, PipeProcessorRuntimeConfiguration)} is called.
   *
   * @param validator the validator used to validate {@link PipeParameters}
   * @throws Exception if any parameter is not valid
   */
  void validate(PipeParameterValidator validator) throws Exception;

  /**
   * This method is mainly used to customize PipeProcessor. In this method, the user can do the
   * following things:
   *
   * <ul>
   *   <li>Use PipeParameters to parse key-value pair attributes entered by the user.
   *   <li>Set the running configurations in PipeProcessorRuntimeConfiguration.
   * </ul>
   *
   * <p>This method is called after the method {@link
   * PipeProcessor#validate(PipeParameterValidator)} is called and before the beginning of the
   * events processing.
   *
   * @param parameters used to parse the input parameters entered by the user
   * @param configuration used to set the required properties of the running PipeProcessor
   * @throws Exception the user can throw errors if necessary
   */
  void customize(PipeParameters parameters, PipeProcessorRuntimeConfiguration configuration)
          throws Exception;

  /**
   * This method is called to process the TabletInsertionEvent.
   *
   * @param tabletInsertionEvent TabletInsertionEvent to be processed
   * @param eventCollector used to collect result events after processing
   * @throws Exception the user can throw errors if necessary
   */
  void process(TabletInsertionEvent tabletInsertionEvent, EventCollector eventCollector)
          throws Exception;

  /**
   * This method is called to process the TsFileInsertionEvent.
   *
   * @param tsFileInsertionEvent TsFileInsertionEvent to be processed
   * @param eventCollector used to collect result events after processing
   * @throws Exception the user can throw errors if necessary
   */
  default void process(TsFileInsertionEvent tsFileInsertionEvent, EventCollector eventCollector)
          throws Exception {
    for (final TabletInsertionEvent tabletInsertionEvent :
            tsFileInsertionEvent.toTabletInsertionEvents()) {
      process(tabletInsertionEvent, eventCollector);
    }
  }

  /**
   * This method is called to process the Event.
   *
   * @param event Event to be processed
   * @param eventCollector used to collect result events after processing
   * @throws Exception the user can throw errors if necessary
   */
  void process(Event event, EventCollector eventCollector) throws Exception;
}

Data sending plugin interface

Data sending is the third stage of the three stages of stream processing data from data extraction to data sending. The data sending plugin (PipeConnector) is mainly used to send data processed by the data processing plugin (PipeProcessor).
Various events, it serves as the network implementation layer of the stream processing framework, and the interface should allow access to multiple real-time communication protocols and multiple connectors.

/**
 * PipeConnector
 *
 * <p>PipeConnector is responsible for sending events to sinks.
 *
 * <p>Various network protocols can be supported by implementing different PipeConnector classes.
 *
 * <p>The lifecycle of a PipeConnector is as follows:
 *
 * <ul>
 *   <li>When a collaboration task is created, the KV pairs of `WITH CONNECTOR` clause in SQL are
 *       parsed and the validation method {@link PipeConnector#validate(PipeParameterValidator)}
 *       will be called to validate the parameters.
 *   <li>Before the collaboration task starts, the method {@link
 *       PipeConnector#customize(PipeParameters, PipeConnectorRuntimeConfiguration)} will be called
 *       to config the runtime behavior of the PipeConnector and the method {@link
 *       PipeConnector#handshake()} will be called to create a connection with sink.
 *   <li>While the collaboration task is in progress:
 *       <ul>
 *         <li>PipeExtractor captures the events and wraps them into three types of Event instances.
 *         <li>PipeProcessor processes the event and then passes them to the PipeConnector.
 *         <li>PipeConnector serializes the events into binaries and send them to sinks. The
 *             following 3 methods will be called: {@link
 *             PipeConnector#transfer(TabletInsertionEvent)}, {@link
 *             PipeConnector#transfer(TsFileInsertionEvent)} and {@link
 *             PipeConnector#transfer(Event)}.
 *       </ul>
 *   <li>When the collaboration task is cancelled (the `DROP PIPE` command is executed), the {@link
 *       PipeConnector#close() } method will be called.
 * </ul>
 *
 * <p>In addition, the method {@link PipeConnector#heartbeat()} will be called periodically to check
 * whether the connection with sink is still alive. The method {@link PipeConnector#handshake()}
 * will be called to create a new connection with the sink when the method {@link
 * PipeConnector#heartbeat()} throws exceptions.
 */
public interface PipeConnector extends PipePlugin {

  /**
   * This method is mainly used to validate {@link PipeParameters} and it is executed before {@link
   * PipeConnector#customize(PipeParameters, PipeConnectorRuntimeConfiguration)} is called.
   *
   * @param validator the validator used to validate {@link PipeParameters}
   * @throws Exception if any parameter is not valid
   */
  void validate(PipeParameterValidator validator) throws Exception;

  /**
   * This method is mainly used to customize PipeConnector. In this method, the user can do the
   * following things:
   *
   * <ul>
   *   <li>Use PipeParameters to parse key-value pair attributes entered by the user.
   *   <li>Set the running configurations in PipeConnectorRuntimeConfiguration.
   * </ul>
   *
   * <p>This method is called after the method {@link
   * PipeConnector#validate(PipeParameterValidator)} is called and before the method {@link
   * PipeConnector#handshake()} is called.
   *
   * @param parameters used to parse the input parameters entered by the user
   * @param configuration used to set the required properties of the running PipeConnector
   * @throws Exception the user can throw errors if necessary
   */
  void customize(PipeParameters parameters, PipeConnectorRuntimeConfiguration configuration)
          throws Exception;

  /**
   * This method is used to create a connection with sink. This method will be called after the
   * method {@link PipeConnector#customize(PipeParameters, PipeConnectorRuntimeConfiguration)} is
   * called or will be called when the method {@link PipeConnector#heartbeat()} throws exceptions.
   *
   * @throws Exception if the connection is failed to be created
   */
  void handshake() throws Exception;

  /**
   * This method will be called periodically to check whether the connection with sink is still
   * alive.
   *
   * @throws Exception if the connection dies
   */
  void heartbeat() throws Exception;

  /**
   * This method is used to transfer the TabletInsertionEvent.
   *
   * @param tabletInsertionEvent TabletInsertionEvent to be transferred
   * @throws PipeConnectionException if the connection is broken
   * @throws Exception the user can throw errors if necessary
   */
  void transfer(TabletInsertionEvent tabletInsertionEvent) throws Exception;

  /**
   * This method is used to transfer the TsFileInsertionEvent.
   *
   * @param tsFileInsertionEvent TsFileInsertionEvent to be transferred
   * @throws PipeConnectionException if the connection is broken
   * @throws Exception the user can throw errors if necessary
   */
  default void transfer(TsFileInsertionEvent tsFileInsertionEvent) throws Exception {
    for (final TabletInsertionEvent tabletInsertionEvent :
            tsFileInsertionEvent.toTabletInsertionEvents()) {
      transfer(tabletInsertionEvent);
    }
  }

  /**
   * This method is used to transfer the Event.
   *
   * @param event Event to be transferred
   * @throws PipeConnectionException if the connection is broken
   * @throws Exception the user can throw errors if necessary
   */
  void transfer(Event event) throws Exception;
}

Custom stream processing plugin management

In order to ensure the flexibility and ease of use of user-defined plugins in actual production, the system also needs to provide the ability to dynamically and uniformly manage plugins.
The stream processing plugin management statements introduced in this chapter provide an entry point for dynamic unified management of plugins.

Load plugin statement

In IoTDB, if you want to dynamically load a user-defined plugin in the system, you first need to implement a specific plugin class based on PipeExtractor, PipeProcessor or PipeConnector.
Then the plugin class needs to be compiled and packaged into a jar executable file, and finally the plugin is loaded into IoTDB using the management statement for loading the plugin.

The syntax of the management statement for loading the plugin is shown in the figure.

CREATE PIPEPLUGIN <alias>
AS <full class name>
USING <URI of JAR package>

Example: If you implement a data processing plugin named edu.tsinghua.iotdb.pipe.ExampleProcessor, and the packaged jar package is pipe-plugin.jar, you want to use this plugin in the stream processing engine, and mark the plugin as example. There are two ways to use the plug-in package, one is to upload to the URI server, and the other is to upload to the local directory of the cluster.

Method 1: Upload to the URI server

Preparation: To register in this way, you need to upload the JAR package to the URI server in advance and ensure that the IoTDB instance that executes the registration statement can access the URI server. For example https://example.com:8080/iotdb/pipe-plugin.jaropen in new window .

SQL:

SQL CREATE PIPEPLUGIN example 
AS 'edu.tsinghua.iotdb.pipe.ExampleProcessor' 
USING URI '<https://example.com:8080/iotdb/pipe-plugin.jar>'

Method 2: Upload the data to the local directory of the cluster

Preparation: To register in this way, you need to place the JAR package in any path on the machine where the DataNode node is located, and we recommend that you place the JAR package in the /ext/pipe directory of the IoTDB installation path (the installation package is already in the installation package, so you do not need to create a new one). For example: iotdb-1.x.x-bin/ext/pipe/pipe-plugin.jar. (Note: If you are using a cluster, you will need to place the JAR package under the same path as the machine where each DataNode node is located)

SQL:

SQL CREATE PIPEPLUGIN example 
AS 'edu.tsinghua.iotdb.pipe.ExampleProcessor' 
USING URI '<file:/IoTDB installation path/iotdb-1.x.x-bin/ext/pipe/pipe-plugin.jar>'

Delete plugin statement

When the user no longer wants to use a plugin and needs to uninstall the plugin from the system, he can use the delete plugin statement as shown in the figure.

DROP PIPEPLUGIN <alias>

View plugin statements

Users can also view plugins in the system on demand. View the statement of the plugin as shown in the figure.

SHOW PIPEPLUGINS

System preset stream processing plugin

Preset extractor plugin

####iotdb-extractor

Function: Extract historical or real-time data inside IoTDB into pipe.

keyvaluevalue rangerequired or notdefault value
sourceiotdb-sourceString: iotdb-sourcerequired-
source.patternPath prefix for filtering time seriesString: any time series prefixoptionalroot
source.history.enableWhether to synchronise history dataBoolean: true, falseoptionaltrue
source.history.start-timeSynchronise the start event time of historical data, including start-timeLong: [Long.MIN_VALUE, Long.MAX_VALUE]optionalLong.MIN_VALUE
source.history.end-timeend event time for synchronised history data, contains end-timeLong: [Long.MIN_VALUE, Long.MAX_VALUE]optionalLong.MAX_VALUE
source.realtime.enableWhether to synchronise real-time dataBoolean: true, falseoptionaltrue
source.realtime.modeExtraction mode for real-time dataString: hybrid, stream, batchoptionalhybrid
source.forwarding-pipe-requestsWhether to forward data written by another Pipe (usually Data Sync)Boolean: true, falseoptionaltrue

🚫 extractor.pattern 参数说明

  • Pattern needs to use backticks to modify illegal characters or illegal path nodes. For example, if you want to filter root.`a@b` or root.`123`, you should set pattern to root.`a@b ` or root.`123` (For details, please refer to [When to use single and double quotes and backticks](https://iotdb.apache.org/zh/Download/#_1-0-versionopen in new window incompatible syntax details illustrate))

  • In the underlying implementation, when pattern is detected as root (default value), the extraction efficiency is higher, and any other format will reduce performance.

  • The path prefix does not need to form a complete path. For example, when creating a pipe with the parameter 'extractor.pattern'='root.aligned.1':

  • root.aligned.1TS

  • root.aligned.1TS.`1`

  • root.aligned.100T

The data will be extracted;

  • root.aligned.`1`
  • root.aligned.`123`

The data will not be extracted.

  • The data of root.__system will not be extracted by pipe. Although users can include any prefix in extractor.pattern, including prefixes with (or overriding) root.__system, the data under root.__system will always be ignored by pipe

❗️Start-time, end-time parameter description of extractor.history

  • start-time, end-time should be in ISO format, such as 2011-12-03T10:15:30 or 2011-12-03T10:15:30+01:00

A piece of data from production to IoTDB contains two key concepts of time

  • event time: The time when the data is actually produced (or the generation time assigned to the data by the data production system, which is the time item in the data point), also called event time.
  • arrival time: The time when data arrives in the IoTDB system.

What we often call out-of-order data refers to data whose event time is far behind the current system time (or the maximum event time that has been dropped) when the data arrives. On the other hand, whether it is out-of-order data or sequential data, as long as they arrive newly in the system, their arrival time will increase with the order in which the data arrives at IoTDB.

💎 iotdb-extractor’s work can be split into two stages

  1. Historical data extraction: all data with arrival time < current system time when creating pipe is called historical data
  2. Real-time data extraction: all arrival time >= data of current system time when creating pipe is called real-time data

The historical data transmission phase and the real-time data transmission phase are executed serially. Only when the historical data transmission phase is completed, the real-time data transmission phase is executed. **

Users can specify iotdb-extractor to:

  • Historical data extraction ('extractor.history.enable' = 'true', 'extractor.realtime.enable' = 'false' )
  • Real-time data extraction ('extractor.history.enable' = 'false', 'extractor.realtime.enable' = 'true' )
  • Full data extraction ('extractor.history.enable' = 'true', 'extractor.realtime.enable' = 'true' )
  • Disable setting extractor.history.enable and extractor.realtime.enable to false at the same time

📌 extractor.realtime.mode: Data extraction mode

  • log: In this mode, the task only uses the operation log for data processing and sending
  • file: In this mode, the task only uses data files for data processing and sending.
  • hybrid: This mode takes into account the characteristics of low latency but low throughput when sending data one by one in the operation log, and the characteristics of high throughput but high latency when sending in batches of data files. It can automatically operate under different write loads. Switch the appropriate data extraction method. First, adopt the data extraction method based on operation logs to ensure low sending delay. When a data backlog occurs, it will automatically switch to the data extraction method based on data files to ensure high sending throughput. When the backlog is eliminated, it will automatically switch back to the data extraction method based on data files. The data extraction method of the operation log avoids the problem of difficulty in balancing data sending delay or throughput using a single data extraction algorithm.

🍕 extractor.forwarding-pipe-requests: Whether to allow forwarding data transmitted from another pipe

  • If you want to use pipe to build data synchronization of A -> B -> C, then the pipe of B -> C needs to set this parameter to true, so that the data written by A to B through the pipe in A -> B can be forwarded correctly. to C
  • If you want to use pipe to build two-way data synchronization (dual-active) of A <-> B, then the pipes of A -> B and B -> A need to set this parameter to false, otherwise the data will be endless. inter-cluster round-robin forwarding

Preset processor plugin

do-nothing-processor

Function: No processing is done on the events passed in by the extractor.

keyvaluevalue rangerequired or optional with default
processordo-nothing-processorString: do-nothing-processorrequired

Preset connector plugin

do-nothing-connector

Function: No processing is done on the events passed in by the processor.

keyvaluevalue rangerequired or optional with default
connectordo-nothing-connectorString: do-nothing-connectorrequired

Stream processing task management

Create a stream processing task

Use the CREATE PIPE statement to create a stream processing task. Taking the creation of a data synchronization stream processing task as an example, the sample SQL statement is as follows:

CREATE PIPE <PipeId> -- PipeId is a name that uniquely identifies the stream processing task
WITH EXTRACTOR (
   --Default IoTDB data extraction plugin
   'extractor' = 'iotdb-extractor',
   --Path prefix, only data that can match the path prefix will be extracted for subsequent processing and sending
   'extractor.pattern' = 'root.timecho',
   -- Whether to extract historical data
   'extractor.history.enable' = 'true',
   -- Describes the time range of the extracted historical data, indicating the earliest time
   'extractor.history.start-time' = '2011.12.03T10:15:30+01:00',
   -- Describes the time range of the extracted historical data, indicating the latest time
   'extractor.history.end-time' = '2022.12.03T10:15:30+01:00',
   -- Whether to extract real-time data
   'extractor.realtime.enable' = 'true',
   --Describe the extraction method of real-time data
   'extractor.realtime.mode' = 'hybrid',
)
WITH PROCESSOR (
   --The default data processing plugin, which does not do any processing
   'processor' = 'do-nothing-processor',
)
WITH CONNECTOR (
   -- IoTDB data sending plugin, the target is IoTDB
   'connector' = 'iotdb-thrift-connector',
   --The data service IP of one of the DataNode nodes in the target IoTDB
   'connector.ip' = '127.0.0.1',
   -- The data service port of one of the DataNode nodes in the target IoTDB
   'connector.port' = '6667',
)

When creating a stream processing task, you need to configure the PipeId and the parameters of the three plugin parts:

ConfigurationDescriptionRequired or notDefault implementationDefault implementation descriptionDefault implementation description
PipeIdA globally unique name that identifies a stream processing---
extractorPipe Extractor plugin, responsible for extracting stream processing data at the bottom of the databaseOptionaliotdb-extractorIntegrate the full historical data of the database and subsequent real-time data arriving into the stream processing taskNo
processorPipe Processor plugin, responsible for processing dataOptionaldo-nothing-processorDoes not do any processing on the incoming data
connectorPipe Connector plugin, responsible for sending data--

In the example, the iotdb-extractor, do-nothing-processor and iotdb-thrift-connector plugins are used to build the data flow processing task. IoTDB also has other built-in stream processing plugins, please check the "System Preset Stream Processing plugin" section.

A simplest example of the CREATE PIPE statement is as follows:

CREATE PIPE <PipeId> -- PipeId is a name that uniquely identifies the stream processing task
WITH CONNECTOR (
   -- IoTDB data sending plugin, the target is IoTDB
   'connector' = 'iotdb-thrift-connector',
   --The data service IP of one of the DataNode nodes in the target IoTDB
   'connector.ip' = '127.0.0.1',
   -- The data service port of one of the DataNode nodes in the target IoTDB
   'connector.port' = '6667',
)

The semantics expressed are: synchronize all historical data in this database instance and subsequent real-time data arriving to the IoTDB instance with the target 127.0.0.1:6667.

Notice:

  • EXTRACTOR and PROCESSOR are optional configurations. If you do not fill in the configuration parameters, the system will use the corresponding default implementation.

  • CONNECTOR is a required configuration and needs to be configured declaratively in the CREATE PIPE statement

  • CONNECTOR has self-reuse capability. For different stream processing tasks, if their CONNECTORs have the same KV attributes (the keys corresponding to the values of all attributes are the same), then the system will only create one CONNECTOR instance in the end to realize the duplication of connection resources. use.

    • For example, there are the following declarations of two stream processing tasks, pipe1 and pipe2:
    CREATE PIPE pipe1
    WITH CONNECTOR (
      'connector' = 'iotdb-thrift-connector',
      'connector.thrift.host' = 'localhost',
      'connector.thrift.port' = '9999',
    )
    
    CREATE PIPE pipe2
    WITH CONNECTOR (
      'connector' = 'iotdb-thrift-connector',
      'connector.thrift.port' = '9999',
      'connector.thrift.host' = 'localhost',
    )
    
  • Because their declarations of CONNECTOR are exactly the same (even if the order of declaration of some attributes is different), the framework will automatically reuse the CONNECTORs they declared, and ultimately the CONNECTORs of pipe1 and pipe2 will be the same instance. .

  • When the extractor is the default iotdb-extractor, and extractor.forwarding-pipe-requests is the default value true, please do not build an application scenario that includes data cycle synchronization (it will cause an infinite loop):

    • IoTDB A -> IoTDB B -> IoTDB A
    • IoTDB A -> IoTDB A

Start the stream processing task

After the CREATE PIPE statement is successfully executed, the stream processing task-related instance will be created, but the running status of the entire stream processing task will be set to STOPPED, that is, the stream processing task will not process data immediately.

You can use the START PIPE statement to cause a stream processing task to start processing data:

START PIPE <PipeId>

Stop the stream processing task

Use the STOP PIPE statement to stop the stream processing task from processing data:

STOP PIPE <PipeId>

Delete stream processing tasks

Use the DROP PIPE statement to stop the stream processing task from processing data (when the stream processing task status is RUNNING), and then delete the entire stream processing task:

DROP PIPE <PipeId>

Users do not need to perform a STOP operation before deleting the stream processing task.

Display stream processing tasks

Use the SHOW PIPES statement to view all stream processing tasks:

SHOW PIPES

The query results are as follows:

+-----------+-----------------------+-------+-------------+-------------+-------------+----------------+
|         ID|          CreationTime |  State|PipeExtractor|PipeProcessor|PipeConnector|ExceptionMessage|
+-----------+-----------------------+-------+-------------+-------------+-------------+----------------+
|iotdb-kafka|2022-03-30T20:58:30.689|RUNNING|          ...|          ...|          ...|            None|
+-----------+-----------------------+-------+-------------+-------------+-------------+----------------+
|iotdb-iotdb|2022-03-31T12:55:28.129|STOPPED|          ...|          ...|          ...| TException: ...|
+-----------+-----------------------+-------+-------------+-------------+-------------+----------------+

You can use <PipeId> to specify the status of a stream processing task you want to see:

SHOW PIPE <PipeId>

You can also use the where clause to determine whether the Pipe Connector used by a certain <PipeId> is reused.

SHOW PIPES
WHERE CONNECTOR USED BY <PipeId>

Stream processing task running status migration

A stream processing pipe will pass through various states during its managed life cycle:

  • STOPPED: The pipe is stopped. When the pipeline is in this state, there are several possibilities:
    • When a pipe is successfully created, its initial state is paused.
    • The user manually pauses a pipe that is in normal running status, and its status will passively change from RUNNING to STOPPED.
    • When an unrecoverable error occurs during the running of a pipe, its status will automatically change from RUNNING to STOPPED
  • RUNNING: pipe is working properly
  • DROPPED: The pipe task was permanently deleted

The following diagram shows all states and state transitions:

State migration diagram
State migration diagram

authority management

Stream processing tasks

Permission nameDescription
USE_PIPERegister a stream processing task. The path is irrelevant.
USE_PIPEStart the stream processing task. The path is irrelevant.
USE_PIPEStop the stream processing task. The path is irrelevant.
USE_PIPEOffload stream processing tasks. The path is irrelevant.
USE_PIPEQuery stream processing tasks. The path is irrelevant.

Stream processing task plugin

Permission nameDescription
USE_PIPERegister stream processing task plugin. The path is irrelevant.
USE_PIPEUninstall the stream processing task plugin. The path is irrelevant.
USE_PIPEQuery stream processing task plugin. The path is irrelevant.

Configuration parameters

In iotdb-common.properties:

####################
### Pipe Configuration
####################

# Uncomment the following field to configure the pipe lib directory.
# For Windows platform
# If its prefix is a drive specifier followed by "\\", or if its prefix is "\\\\", then the path is
# absolute. Otherwise, it is relative.
# pipe_lib_dir=ext\\pipe
# For Linux platform
# If its prefix is "/", then the path is absolute. Otherwise, it is relative.
# pipe_lib_dir=ext/pipe

# The maximum number of threads that can be used to execute the pipe subtasks in PipeSubtaskExecutor.
# The actual value will be min(pipe_subtask_executor_max_thread_num, max(1, CPU core number / 2)).
# pipe_subtask_executor_max_thread_num=5

# The connection timeout (in milliseconds) for the thrift client.
# pipe_connector_timeout_ms=900000

Copyright © 2024 The Apache Software Foundation.
Apache and the Apache feather logo are trademarks of The Apache Software Foundation

Have a question? Connect with us on QQ, WeChat, or Slack. Join the community now.

We use Google Analytics to collect anonymous, aggregated usage information.