大约 2 分钟
Flink IoTDB 连接器
IoTDB 与 Apache Flink 的集成。此模块包含了 iotdb sink,允许 flink job 将时序数据写入 IoTDB。
IoTDBSink
使用 IoTDBSink
,您需要定义一个 IoTDBOptions
和一个 IoTSerializationSchema
实例。 IoTDBSink
默认每次发送一个数据,可以通过调用 withBatchSize(int)
进行调整。
示例
该示例演示了如下从一个 Flink job 中发送数据到 IoTDB server 的场景:
一个模拟的 Source
SensorSource
每秒钟产生一个数据点。Flink 使用
IoTDBSink
消费产生的数据并写入 IoTDB 。import org.apache.iotdb.tsfile.file.metadata.enums.CompressionType; import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType; import org.apache.iotdb.tsfile.file.metadata.enums.TSEncoding; import com.google.common.collect.Lists; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.api.functions.source.SourceFunction; import java.security.SecureRandom; import java.util.HashMap; import java.util.Map; import java.util.Random; public class FlinkIoTDBSink { public static void main(String[] args) throws Exception { // run the flink job on local mini cluster StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); IoTDBOptions options = new IoTDBOptions(); options.setHost("127.0.0.1"); options.setPort(6667); options.setUser("root"); options.setPassword("root"); options.setStorageGroup("root.sg"); // If the server enables auto_create_schema, then we do not need to register all timeseries // here. options.setTimeseriesOptionList( Lists.newArrayList( new IoTDBOptions.TimeseriesOption( "root.sg.d1.s1", TSDataType.DOUBLE, TSEncoding.GORILLA, CompressionType.SNAPPY))); IoTSerializationSchema serializationSchema = new DefaultIoTSerializationSchema(); IoTDBSink ioTDBSink = new IoTDBSink(options, serializationSchema) // enable batching .withBatchSize(10) // how many connectons to the server will be created for each parallelism .withSessionPoolSize(3); env.addSource(new SensorSource()) .name("sensor-source") .setParallelism(1) .addSink(ioTDBSink) .name("iotdb-sink"); env.execute("iotdb-flink-example"); } private static class SensorSource implements SourceFunction<Map<String, String>> { boolean running = true; Random random = new SecureRandom(); @Override public void run(SourceContext context) throws Exception { while (running) { Map<String, String> tuple = new HashMap(); tuple.put("device", "root.sg.d1"); tuple.put("timestamp", String.valueOf(System.currentTimeMillis())); tuple.put("measurements", "s1"); tuple.put("types", "DOUBLE"); tuple.put("values", String.valueOf(random.nextDouble())); context.collect(tuple); Thread.sleep(1000); } } @Override public void cancel() { running = false; } } }
运行方法
- 启动 IoTDB server
- 运行
org.apache.iotdb.flink.FlinkIoTDBSink.java
将 Flink job 运行在本地的集群上。