跳至主要內容

大约 8 分钟

连续查询(Continuous Query, CQ)

我们可以通过 SQL 语句注册、或卸载一个 CQ 实例,以及查询到所有已经注册的 CQ 配置信息。

注意,目前连续查询尚未对分布式场景进行适配。敬请期待分布式版本。

SQL 语句

创建 CQ

语法
CREATE (CONTINUOUS QUERY | CQ) <cq_id> 
[RESAMPLE EVERY <every_interval> FOR <for_interval> BOUNDARY <execution_boundary_time>] 
BEGIN 
SELECT <function>(<path_suffix>) INTO <full_path> | <node_name>
FROM <path_prefix>
GROUP BY time(<group_by_interval>) [, level = <level>] 
END

其中:

  • <cq_id> 指定 CQ 全局唯一的 id。

  • <every_interval> 指定查询执行时间间隔,支持 ns、us、ms、s、m、h、d、w 等单位,其值不应小于用户所配置的 continuous_query_min_every_interval 值。可选择指定。

  • <for_interval> 指定每次查询的窗口大小,即查询时间范围为[now() - <for_interval>, now()),其中 now() 指查询时的时间戳。支持 ns、us、ms、s、m、h、d、w 等单位。可选择指定。

  • <execution_boundary_time> 是一个日期参数,表示第一个窗口的起始时间

    • <execution_boundary_time> 可早于、等于、晚于当前时间
    • 该参数可选择指定,不指定的情况下等价于输入 BOUNDARY now()
    • 第一个窗口的结束时间<execution_boundary_time> + <for_interval>
    • i (1 <= i) 个窗口的开始时间 <execution_boundary_time> + <for_interval> + (i - 1) * <every_interval>
    • i (1 <= i) 个窗口的结束时间 <execution_boundary_time> + <for_interval> + i * <every_interval>
    • 如果当前时间小于等于第一个窗口的结束时间 ,那么连续查询的第一个执行时刻为第一个窗口的结束时间
    • 如果当前时间大于第一个窗口的结束时间,那么连续查询的第一个执行时刻为第一个大于等于当前时间窗口结束时间
    • 每一个执行时刻执行的查询时间范围[now() - <for_interval>, now())
  • <function> 指定聚合函数,目前支持 count, sum, avg, last_value, first_value, min_time, max_time, min_value, max_value 等。

  • <path_prefix><path_suffix> 拼接成完整的查询原时间序列。

  • <full_path><node_name> 指定将查询出的数据写入的结果序列路径。

  • <group_by_interval> 指定时间分组长度,支持 ns、us、ms、s、m、h、d、w、mo、y 等单位。

  • <level>指按照序列第 <level> 层分组,将第 <level> 层同名的所有序列聚合。Group By Level 语句的具体语义及 <level> 的定义见 路径层级分组聚合

注:

  • <for_interval>, <every_interval> 可选择指定。如果用户没有指定其中的某一项,则未指定项的值按照<group_by_interval> 处理。
    • <every_interval><for_interval><group_by_interval> 的值均应大于 0。
    • <group_by_interval> 的值应小于<for_interval>的值,否则系统会按照等于<for_interval>的值处理。
    • 用户应当结合实际需求指定合适的 <for_interval><every_interval>
      • <for_interval> 大于 <every_interval>,每次的查询窗口会有部分数据重叠,从查询性能角度这种配置不被建议。
      • <for_interval> 小于 <every_interval>,每次的查询窗口之间可能会有未覆盖到的数据。
  • 对于结果序列路径
    • 用户可以选择指定<full_path>,即以 root 开头的完整的时间序列路径,用户可以在路径中使用 ${x} 变量来表示原始时间序列中 level = x 的节点名称,x应当大于等于 1 且小于等于 <level>
      (若未指定 level,则应小于等于 <path_prefix> 长度)。
    • 用户也可以仅指定<node_name>,即生成时间序列路径的最后一个结点名。
      • 若用户指定 <level> = l,则系统生成的结果时间序列路径为 root.${1}. ... .${l}.<node_name>
      • 若用户未指定 <level>,令原始时间序列最大层数为 L
        则系统生成的结果时间序列路径为 root.${1}. ... .${L - 1}.<node_name>
示例
原始时间序列
+-----------------------------+-----+-------------+--------+--------+-----------+----+----------+
|                   timeseries|alias|storage group|dataType|encoding|compression|tags|attributes|
+-----------------------------+-----+-------------+--------+--------+-----------+----+----------+
|root.ln.wf02.wt02.temperature| null|      root.ln|   FLOAT| GORILLA|     SNAPPY|null|      null|
|root.ln.wf02.wt01.temperature| null|      root.ln|   FLOAT| GORILLA|     SNAPPY|null|      null|
|root.ln.wf01.wt02.temperature| null|      root.ln|   FLOAT| GORILLA|     SNAPPY|null|      null|
|root.ln.wf01.wt01.temperature| null|      root.ln|   FLOAT| GORILLA|     SNAPPY|null|      null|
+-----------------------------+-----+-------------+--------+--------+-----------+----+----------+
+-----------------------------+-----------------------------+-----------------------------+-----------------------------+-----------------------------+
|                         Time|root.ln.wf02.wt02.temperature|root.ln.wf02.wt01.temperature|root.ln.wf01.wt02.temperature|root.ln.wf01.wt01.temperature|
+-----------------------------+-----------------------------+-----------------------------+-----------------------------+-----------------------------+
|2021-05-11T22:18:14.598+08:00|                        121.0|                         72.0|                        183.0|                        115.0|
|2021-05-11T22:18:19.941+08:00|                          0.0|                         68.0|                         68.0|                        103.0|
|2021-05-11T22:18:24.949+08:00|                        122.0|                         45.0|                         11.0|                         14.0|
|2021-05-11T22:18:29.967+08:00|                         47.0|                         14.0|                         59.0|                        181.0|
|2021-05-11T22:18:34.979+08:00|                        182.0|                        113.0|                         29.0|                        180.0|
|2021-05-11T22:18:39.990+08:00|                         42.0|                         11.0|                         52.0|                         19.0|
|2021-05-11T22:18:44.995+08:00|                         78.0|                         38.0|                        123.0|                         52.0|
|2021-05-11T22:18:49.999+08:00|                        137.0|                        172.0|                        135.0|                        193.0|
|2021-05-11T22:18:55.003+08:00|                         16.0|                        124.0|                        183.0|                         18.0|
+-----------------------------+-----------------------------+-----------------------------+-----------------------------+-----------------------------+
结果序列配置举例说明

对于以上原始时间序列,若用户指定查询聚合层级为 2,聚合函数为 avg
用户可以在 INTO 语句中仅指定生成序列的最后一个结点名,若用户将其指定为 temperature_avg,则系统生成的完整路径为 root.${1}.${2}.temperature_avg
用户也可以在 INTO 语句中指定完整写入路径,用户可将其指定为 root.${1}.${2}.temperature_avgroot.ln_cq.${2}.temperature_avgroot.${1}_cq.${2}.temperature_avgroot.${1}.${2}_cq.temperature_avg等,
也可以按需要指定为 root.${2}.${1}.temperature_avg 等其它形式。
需要注意的是,${x} 中的 x 应当大于等于 1 且小于等于 <level>
(若未指定 <level>,则应小于等于 <path_prefix> 层级)。在上例中,x 应当小于等于 2

创建 cq1
CREATE CONTINUOUS QUERY cq1 
BEGIN 
  SELECT max_value(temperature) 
  INTO temperature_max 
  FROM root.ln.*.* 
  GROUP BY time(10s) 
END

每隔 10s 查询 root.ln.*.*.temperature 在前 10s 内的最大值(结果以 10s 为一组),
将结果写入到 root.${1}.${2}.${3}.temperature_max 中,
结果将产生 4 条新序列:

+---------------------------------+-----+-------------+--------+--------+-----------+----+----------+
|                       timeseries|alias|storage group|dataType|encoding|compression|tags|attributes|
+---------------------------------+-----+-------------+--------+--------+-----------+----+----------+
|root.ln.wf02.wt02.temperature_max| null|      root.ln|   FLOAT| GORILLA|     SNAPPY|null|      null|
|root.ln.wf02.wt01.temperature_max| null|      root.ln|   FLOAT| GORILLA|     SNAPPY|null|      null|
|root.ln.wf01.wt02.temperature_max| null|      root.ln|   FLOAT| GORILLA|     SNAPPY|null|      null|
|root.ln.wf01.wt01.temperature_max| null|      root.ln|   FLOAT| GORILLA|     SNAPPY|null|      null|
+---------------------------------+-----+-------------+--------+--------+-----------+----+----------+
+-----------------------------+---------------------------------+---------------------------------+---------------------------------+---------------------------------+
|                         Time|root.ln.wf02.wt02.temperature_max|root.ln.wf02.wt01.temperature_max|root.ln.wf01.wt02.temperature_max|root.ln.wf01.wt01.temperature_max|
+-----------------------------+---------------------------------+---------------------------------+---------------------------------+---------------------------------+
|2021-05-11T22:18:16.964+08:00|                            122.0|                             68.0|                             68.0|                            103.0|
|2021-05-11T22:18:26.964+08:00|                            182.0|                            113.0|                             59.0|                            181.0|
|2021-05-11T22:18:36.964+08:00|                             78.0|                             38.0|                            123.0|                             52.0|
|2021-05-11T22:18:46.964+08:00|                            137.0|                            172.0|                            183.0|                            193.0|
+-----------------------------+---------------------------------+---------------------------------+---------------------------------+---------------------------------+
创建 cq2
CREATE CONTINUOUS QUERY cq2 
RESAMPLE EVERY 20s FOR 20s 
BEGIN 
  SELECT avg(temperature) 
  INTO temperature_avg 
  FROM root.ln.*.* 
  GROUP BY time(10s), level=2 
END

每隔 20s 查询 root.ln.*.*.temperature 在前 20s 内的平均值(结果以 10s 为一组,按照第 2 层节点分组),
将结果写入到 root.${1}.${2}.temperature_avg 中。
结果将产生如下两条新序列,
其中 root.ln.wf02.temperature_avgroot.ln.wf02.wt02.temperatureroot.ln.wf02.wt01.temperature 聚合计算生成,
root.ln.wf01.temperature_avgroot.ln.wf01.wt02.temperatureroot.ln.wf01.wt01.temperature 聚合计算生成。

+----------------------------+-----+-------------+--------+--------+-----------+----+----------+
|                  timeseries|alias|storage group|dataType|encoding|compression|tags|attributes|
+----------------------------+-----+-------------+--------+--------+-----------+----+----------+
|root.ln.wf02.temperature_avg| null|      root.ln|  DOUBLE| GORILLA|     SNAPPY|null|      null|
|root.ln.wf01.temperature_avg| null|      root.ln|  DOUBLE| GORILLA|     SNAPPY|null|      null|
+----------------------------+-----+-------------+--------+--------+-----------+----+----------+
+-----------------------------+----------------------------+----------------------------+
|                         Time|root.ln.wf02.temperature_avg|root.ln.wf01.temperature_avg|
+-----------------------------+----------------------------+----------------------------+
|2021-05-11T22:18:16.969+08:00|                       58.75|                        49.0|
|2021-05-11T22:18:26.969+08:00|                        89.0|                      112.25|
|2021-05-11T22:18:36.969+08:00|                       42.25|                        61.5|
|2021-05-11T22:18:46.969+08:00|                      112.25|                      132.25|
+-----------------------------+----------------------------+----------------------------+
创建 cq3
CREATE CONTINUOUS QUERY cq3 
RESAMPLE EVERY 20s FOR 20s 
BEGIN 
  SELECT avg(temperature) 
  INTO root.ln_cq.${2}.temperature_avg 
  FROM root.ln.*.* 
  GROUP BY time(10s), level=2 
END

查询模式与 cq2 相同,在这个例子中,用户自行指定结果写入到 root.ln_cq.${2}.temperature_avg 中。
结果将产生如下两条新序列,
其中 root.ln_cq.wf02.temperature_avgroot.ln.wf02.wt02.temperatureroot.ln.wf02.wt01.temperature 聚合计算生成,
root.ln_cq.wf01.temperature_avgroot.ln.wf01.wt02.temperatureroot.ln.wf01.wt01.temperature 聚合计算生成。

+-------------------------------+-----+-------------+--------+--------+-----------+----+----------+
|                     timeseries|alias|storage group|dataType|encoding|compression|tags|attributes|
+-------------------------------+-----+-------------+--------+--------+-----------+----+----------+
|root.ln_cq.wf02.temperature_avg| null|   root.ln_cq|  DOUBLE| GORILLA|     SNAPPY|null|      null|
|root.ln_cq.wf01.temperature_avg| null|   root.ln_cq|  DOUBLE| GORILLA|     SNAPPY|null|      null|
+-------------------------------+-----+-------------+--------+--------+-----------+----+----------+
+-----------------------------+-------------------------------+-------------------------------+
|                         Time|root.ln_cq.wf02.temperature_avg|root.ln_cq.wf01.temperature_avg|
+-----------------------------+-------------------------------+-------------------------------+
|2021-05-11T22:18:16.971+08:00|                          58.75|                           49.0|
|2021-05-11T22:18:26.971+08:00|                           89.0|                         112.25|
|2021-05-11T22:18:36.971+08:00|                          42.25|                           61.5|
|2021-05-11T22:18:46.971+08:00|                         112.25|                         132.25|
+-----------------------------+-------------------------------+-------------------------------+
创建 cq4
CREATE CONTINUOUS QUERY cq4 
RESAMPLE EVERY 20s FOR 20s BOUNDARY 2022-01-14T23:00:00.000+08:00 
BEGIN 
  SELECT avg(temperature) 
  INTO root.ln_cq.${2}.temperature_avg 
  FROM root.ln.*.* GROUP BY time(10s), level=2 
END

这个例子与创建 cq3 几乎完全相同。不同的是,在这个例子中用户自行指定了 BOUNDARY 2022-01-14T23:00:00.000+08:00

注意这个 CQ 的第一个执行时刻大于例子中的时间,因此 2022-01-14T23:00:20.000+08:00 为第一个执行时刻。递推地,2022-01-14T23:00:40.000+08:00 为第二个执行时刻,2022-01-14T23:01:00.000+08:00 为第三个执行时刻……

第一个执行时刻执行的 SQL 语句为 select avg(temperature) from root.ln.*.* group by ([2022-01-14T23:00:00.000+08:00, 2022-01-14T23:00:20.000+08:00), 10s), level = 2

第二个执行时刻执行的 SQL 语句为 select avg(temperature) from root.ln.*.* group by ([2022-01-14T23:00:20.000+08:00, 2022-01-14T23:00:40.000+08:00), 10s), level = 2

第三个执行时刻执行的 SQL 语句为 select avg(temperature) from root.ln.*.* group by ([2022-01-14T23:00:40.000+08:00, 2022-01-14T23:01:00.000+08:00), 10s), level = 2

……

展示 CQ 信息

语法
SHOW (CONTINUOUS QUERIES | CQS) 
结果示例
+-------+--------------+------------+-------------+----------------------------------------------------------------------------------------+-----------------------------------+
|cq name|every interval|for interval|     boundary|                                                                               query sql|                        target path|
+-------+--------------+------------+-------------+----------------------------------------------------------------------------------------+-----------------------------------+
|    cq1|         10000|       10000|1642166102238|     select max_value(temperature) from root.ln.*.* group by ([now() - 10s, now()), 10s)|root.${1}.${2}.${3}.temperature_max|
|    cq3|         20000|       20000|1642166118339|select avg(temperature) from root.ln.*.* group by ([now() - 20s, now()), 10s), level = 2|    root.ln_cq.${2}.temperature_avg|
|    cq2|         20000|       20000|1642166111493|select avg(temperature) from root.ln.*.* group by ([now() - 20s, now()), 10s), level = 2|     root.${1}.${2}.temperature_avg|
|    cq4|         20000|       20000|1642172400000|select avg(temperature) from root.ln.*.* group by ([now() - 20s, now()), 10s), level = 2|    root.ln_cq.${2}.temperature_avg|
+-------+--------------+------------+-------------+----------------------------------------------------------------------------------------+-----------------------------------+

删除 CQ

语法
DROP (CONTINUOUS QUERY | CQ) <cq_id> 
示例
DROP CONTINUOUS QUERY cq3
DROP CQ cq3

系统参数配置

参数名描述数据类型默认值
continuous_query_execution_thread执行连续查询任务的线程池的线程数intmax(1, CPU 核数 / 2)
max_pending_continuous_query_tasks队列中连续查询最大任务堆积数int64
continuous_query_min_every_interval连续查询执行时间间隔的最小值duration1s

Copyright © 2024 The Apache Software Foundation.
Apache and the Apache feather logo are trademarks of The Apache Software Foundation

Have a question? Connect with us on QQ, WeChat, or Slack. Join the community now.