Skip to main content

Kafka

...About 1 min

Kafka

Apache Kafkaopen in new window is an open-source distributed event streaming platform used by thousands of companies for high-performance data pipelines, streaming analytics, data integration, and mission-critical applications.

Coding Example

kafka Producer Producing Data Java Code Example

    Properties props = new Properties();
    props.put("bootstrap.servers", "127.0.0.1:9092");
    props.put("key.serializer", StringSerializer.class);
    props.put("value.serializer", StringSerializer.class);
    KafkaProducer<String, String> producer = new KafkaProducer<>(props);
    producer.send(
        new ProducerRecord<>(
            "Kafka-Test", "key", "root.kafka," + System.currentTimeMillis() + ",value,INT32,100"));
    producer.close();

kafka Consumer Receiving Data Java Code Example

    Properties props = new Properties();
    props.put("bootstrap.servers", "127.0.0.1:9092");
    props.put("key.deserializer", StringDeserializer.class);
    props.put("value.deserializer", StringDeserializer.class);
    props.put("auto.offset.reset", "earliest");
    props.put("group.id", "Kafka-Test");
    KafkaConsumer<String, String> kafkaConsumer = new KafkaConsumer<>(props);
    kafkaConsumer.subscribe(Collections.singleton("Kafka-Test"));
    ConsumerRecords<String, String> records = kafkaConsumer.poll(Duration.ofSeconds(1));

Example of Java Code Stored in IoTDB Server

    SessionPool pool =
        new SessionPool.Builder()
            .host("127.0.0.1")
            .port(6667)
            .user("root")
            .password("root")
            .maxSize(3)
            .build();
    List<String> datas = new ArrayList<>(records.count());
    for (ConsumerRecord<String, String> record : records) {
      datas.add(record.value());
    }
    int size = datas.size();
    List<String> deviceIds = new ArrayList<>(size);
    List<Long> times = new ArrayList<>(size);
    List<List<String>> measurementsList = new ArrayList<>(size);
    List<List<TSDataType>> typesList = new ArrayList<>(size);
    List<List<Object>> valuesList = new ArrayList<>(size);
    for (String data : datas) {
      String[] dataArray = data.split(",");
      String device = dataArray[0];
      long time = Long.parseLong(dataArray[1]);
      List<String> measurements = Arrays.asList(dataArray[2].split(":"));
      List<TSDataType> types = new ArrayList<>();
      for (String type : dataArray[3].split(":")) {
        types.add(TSDataType.valueOf(type));
      }
      List<Object> values = new ArrayList<>();
      String[] valuesStr = dataArray[4].split(":");
      for (int i = 0; i < valuesStr.length; i++) {
        switch (types.get(i)) {
          case INT64:
            values.add(Long.parseLong(valuesStr[i]));
            break;
          case DOUBLE:
            values.add(Double.parseDouble(valuesStr[i]));
            break;
          case INT32:
            values.add(Integer.parseInt(valuesStr[i]));
            break;
          case TEXT:
            values.add(valuesStr[i]);
            break;
          case FLOAT:
            values.add(Float.parseFloat(valuesStr[i]));
            break;
          case BOOLEAN:
            values.add(Boolean.parseBoolean(valuesStr[i]));
            break;
        }
      }
      deviceIds.add(device);
      times.add(time);
      measurementsList.add(measurements);
      typesList.add(types);
      valuesList.add(values);
    }
    pool.insertRecords(deviceIds, times, measurementsList, typesList, valuesList);

Copyright © 2024 The Apache Software Foundation.
Apache IoTDB, IoTDB, Apache, the Apache feather logo, and the Apache IoTDB project logo are either registered trademarks or trademarks of The Apache Software Foundation in all countries

Have a question? Connect with us on QQ, WeChat, or Slack. Join the community now.