小 T 导读:TDengine 3.0 引入了全新的流式计算引擎,既支持时间驱动的流式计算,也支持事件驱动的流式计算。本文将对新的流式计算引擎的语法规则进行详细介绍,方便开发者及企业使用。

TDengine 是一款开源、云原生的时序数据库(Time Series Database,TSDB),专为物联网、工业互联网、金融、IT 运维监控等场景设计并优化。近期发布的 TDengine 3.0,全新的流式计算引擎是其一大亮点。

TDengine 3.0 的流式计算引擎提供了实时处理写入的数据流能力,使用 SQL 定义实时流变换,当数据被写入流的源表后,数据会被以定义的方式自动处理,并根据定义的触发模式向目的表推送结果。它提供了替代复杂流处理系统的轻量级解决方案,并能够在高吞吐的数据写入情况下,提供毫秒级的计算结果延迟。

流式计算可以包含数据过滤,标量函数计算(含 UDF),以及窗口聚合(支持滑动窗口、会话窗口与状态窗口),可以以超级表、子表、普通表为源表,写入到目的超级表。在创建流时,目的超级表将被自动创建,随后新插入的数据会被流定义的方式处理并写入其中,通过 partition by 子句,可以以表名或标签划分 partition,不同的 partition 将写入到目的超级表的不同子表。

TDengine 的流式计算能够支持分布在多个 vnode 中的超级表聚合;还能够处理乱序数据的写入:它提供了 watermark 机制以度量容忍数据乱序的程度,并提供了 ignore expired 配置项以决定乱序数据的处理策略——丢弃或者重新计算。
下面我们就一起看一下 TDengine 中流式计算相关的 SQL 语法。

流式计算的创建、删除与展示

创建

CREATE STREAM [IF NOT EXISTS] stream_name [stream_options] INTO stb_name AS subquery
stream_options: {TRIGGER    [AT_ONCE | WINDOW_CLOSE | MAX_DELAY time]WATERMARK   time
}

其中 subquery 是 select 普通查询语法的子集:

subquery: SELECT select_listfrom_clause[WHERE condition][PARTITION BY tag_list][window_clause]

支持会话窗口、状态窗口与滑动窗口,其中,会话窗口与状态窗口搭配超级表时必须与 partition by tbname 一起使用:

window_clause: {SESSION(ts_col, tol_val)| STATE_WINDOW(col)| INTERVAL(interval_val [, interval_offset]) [SLIDING (sliding_val)]
}

在上述语句中,SESSION 是会话窗口,tol_val 是时间间隔的最大范围。在 tol_val 时间间隔范围内的数据都属于同一个窗口,如果有连续两条数据的时间超过 tol_val,则自动开启下一个窗口。窗口的定义与时序数据特色查询中的定义完全相同,详见 TDengine 特色查询。

例如,使用如下语句创建流式计算,同时自动创建名为 avg_vol 的超级表,此流计算以一分钟为时间窗口、30 秒为前向增量统计这些电表的平均电压,并将来自 meters 表的数据的计算结果写入 avg_vol 表,不同 partition 的数据会分别创建子表并写入不同子表。

CREATE STREAM avg_vol_s INTO avg_vol AS
SELECT _wstartts, count(*), avg(voltage) FROM meters PARTITION BY tbname INTERVAL(1m) SLIDING(30s);

删除

DROP STREAM [IF NOT EXISTS] stream_name;

仅删除流式计算任务,由流式计算写入的数据不会被删除。

展示

SHOW STREAMS;

若要展示更详细的信息,可以使用:

SELECT * from performance_schema.`perf_streams`;

流式计算的 partition

我们可以使用 PARTITION BY TBNAME 或 PARTITION BY tag 对一个流进行多分区的计算,每个分区的时间线与时间窗口是独立的,会各自聚合,并写入到目的表中的不同子表。如果不带 PARTITION BY 选项,那所有的数据将写入到一张子表。

流式计算创建的超级表有唯一的 tag 列 groupId,每个 partition 会被分配唯一 groupId。与 schemaless 写入一致,我们通过 MD5 计算子表名,并自动创建它。

流式计算的触发模式

在创建流时,可以通过 TRIGGER 指令指定流式计算的触发模式。

对于非窗口计算,流式计算的触发是实时的;对于窗口计算,目前提供如下 3 种触发模式:

  1. AT_ONCE:写入立即触发
  2. WINDOW_CLOSE:窗口关闭时触发(窗口关闭由事件时间决定,可配合 watermark 使用)
  3. MAX_DELAY time:若窗口关闭,则触发计算。若窗口未关闭,且未关闭时长超过 max delay 指定的时间,则触发计算。

由于窗口关闭是由事件时间所决定的,如果因事件流中断、或持续延迟导致事件时间无法更新,可能无法得到最新的计算结果。因此,流式计算提供了以事件时间结合处理时间计算的 MAX_DELAY 触发模式,MAX_DELAY 模式在窗口关闭时会立即触发计算。此外,当数据写入后,计算触发的时间超过 max delay 指定的时间,则立即触发计算。

流式计算的窗口关闭

流式计算以事件时间(插入记录中的时间戳主键)为基准计算窗口关闭,而非以 TDengine 服务器的时间,这样可以避免客户端与服务器时间不一致带来的问题,有效解决乱序数据写入等难题。同时,流式计算还提供了 watermark 来定义容忍的乱序程度。
在创建流时,我们可以在 stream_option 中指定 watermark,它定义了数据乱序的容忍上界。流式计算通过 watermark 来度量对乱序数据的容忍程度,watermark 默认为 0。

T = 最新事件时间 – watermark

每次写入的数据都会以上述公式更新窗口关闭时间,并将窗口结束时间 < T 的所有打开的窗口关闭,若触发模式为 WINDOW_CLOSE 或 MAX_DELAY,则推送窗口聚合结果。

在上图中,纵轴表示不同时刻,对于不同时刻,我们画出其对应的 TDengine 收到的数据,即为横轴。已知横轴上的数据点表示已经收到的数据,其中蓝色的点表示事件时间(即数据中的时间戳主键)最后的数据,该数据点减去定义的 watermark 时间,就得到乱序容忍的上界 T。所有结束时间小于 T 的窗口都将被关闭(图中以灰色方框标记)。

在 T2 时刻,乱序数据(黄色的点)到达 TDengine,由于有 watermark 的存在,这些数据进入的窗口并未被关闭,因此可以被正确处理。在 T3 时刻,最新事件到达,T 向后推移超过了第二个窗口关闭的时间,该窗口被关闭,乱序数据被正确处理。

但要注意,在 window_close 或 max_delay 模式下,窗口关闭直接影响推送结果。在 at_once 模式下,窗口关闭只与内存占用有关。

流式计算的过期数据处理策略

对于已关闭的窗口,再次落入该窗口中的数据就会被标记为过期数据。TDengine 对于过期数据提供两种处理方式,由 IGNORE EXPIRED 选项指定:

  1. 重新计算,即 IGNORE EXPIRED 0:默认配置,从 TSDB 中重新查找对应窗口的所有数据并重新计算得到最新结果
  2. 直接丢弃,即 IGNORE EXPIRED 1:忽略过期数据

无论在哪种模式下,watermark 都应该被妥善设置,来得到正确结果(直接丢弃模式)或避免频繁触发重算带来的性能开销(重新计算模式)。

示例

企业电表的数据经常都是成百上千亿条的,想要将这些分散、凌乱的数据清洗或转换都需要比较长的时间,很难做到高效性和实时性。在如下例子中,通过 TDengine 流计算可以将电表电压大于 220V 的数据清洗掉,然后以 5 秒为窗口整合并计算出每个窗口中电流的最大值,最后将结果输出到指定的数据表中。

创建 Database 和原始数据表

首先准备数据,完成建库、建一张超级表和多张子表操作:

DROP DATABASE IF EXISTS power;
CREATE DATABASE power;
USE power;
CREATE STABLE meters (ts timestamp, current float, voltage int, phase float) TAGS (location binary(64), groupId int);
CREATE TABLE d1001 USING meters TAGS ("California.SanFrancisco", 2);
CREATE TABLE d1002 USING meters TAGS ("California.SanFrancisco", 3);
CREATE TABLE d1003 USING meters TAGS ("California.LosAngeles", 2);
CREATE TABLE d1004 USING meters TAGS ("California.LosAngeles", 3);

创建流

create stream current_stream into current_stream_output_stb as select _wstart as start, _wend as end, max(current) as max_current from meters where voltage <= 220 interval (5s);

写入数据

insert into d1001 values("2018-10-03 14:38:05.000", 10.30000, 219, 0.31000);
insert into d1001 values("2018-10-03 14:38:15.000", 12.60000, 218, 0.33000);
insert into d1001 values("2018-10-03 14:38:16.800", 12.30000, 221, 0.31000);
insert into d1002 values("2018-10-03 14:38:16.650", 10.30000, 218, 0.25000);
insert into d1003 values("2018-10-03 14:38:05.500", 11.80000, 221, 0.28000);
insert into d1003 values("2018-10-03 14:38:16.600", 13.40000, 223, 0.29000);
insert into d1004 values("2018-10-03 14:38:05.000", 10.80000, 223, 0.29000);
insert into d1004 values("2018-10-03 14:38:06.500", 11.50000, 221, 0.35000);

查询以观察结果

taos> select start, end, max_current from current_stream_output_stb;start          |           end           |     max_current      |
===========================================================================2018-10-03 14:38:05.000 | 2018-10-03 14:38:10.000 |             10.30000 |2018-10-03 14:38:15.000 | 2018-10-03 14:38:20.000 |             12.60000 |
Query OK, 2 rows in database (0.018762s)

写在最后

如果大家能够运用好 TDengine 3.0 提供的流计算引擎,就不需要再部署其他的第三方流处理系统,这样一来,不仅降低了系统的复杂度,还大大减少了研发和运维成本。在实际操作中应用 TDengine 流计算引擎时,上述的详细语法会带给你很多帮助,如果还产生了其他更为复杂的应用问题,你也可以进入 TDengine 社区向技术人员寻求帮助。


想了解更多 TDengine Database的具体细节,欢迎大家在GitHub上查看相关源代码。

TDengine3.0流式计算引擎语法规则介绍相关推荐

  1. 大数据之Flink流式计算引擎

    Flink DataFlow 数据的分类 有界数据 离线的计算 效率:硬件相同的情况下:时间 无界数据 实时的计算 效率:硬件环境相同的情况下,吞吐量:数据处理的数量.延迟:结果输出的时间-数据接收的 ...

  2. 为什么阿里会选择 Flink 作为新一代流式计算引擎?

    本文由 [AI前线]原创,ID:ai-front,原文链接:t.cn/ROISIr3 [AI前线导读]2017 年 10 月 19日,阿里巴巴的高级技术专家王绍翾(花名"大沙")将 ...

  3. Flink系列之Flink流式计算引擎基础理论

    声明:         文章中代码及相关语句为自己根据相应理解编写,文章中出现的相关图片为自己实践中的截图和相关技术对应的图片,若有相关异议,请联系删除.感谢.转载请注明出处,感谢. By luoye ...

  4. 流式计算strom,Strom解决的问题,实现实时计算系统要解决那些问题,离线计算是什么,流式计算什么,离线和实时计算区别,strom应用场景,Strorm架构图和编程模型(来自学习资料)

    1.背景-流式计算与storm 2011年在海量数据处理领域,Hadoop是人们津津乐道的技术,Hadoop不仅可以用来存储海量数据,还以用来计算海量数据.因为其高吞吐.高可靠等特点,很多互联网公司都 ...

  5. java学习笔记20(Lambda表达式、函数式编程、流式计算、练习)

    文章目录 11.3 学习内容 Lambda表达式 Lambda标准格式 格式说明 省略规则 使用前提 函数式接口 预定义的函数式接口 工作内容 任务1 总结&明日计划 11.4 学习内容 流式 ...

  6. Oceanus的实时流式计算实践与优化

    导语 | 随着互联网场景的不断深化发展,业务实时化趋势越来越强,要求也越来越高.特别是在广告推荐.实时大屏监控.实时风控.实时数仓等各业务领域,实时计算已经成为了不可或缺的一环.在大数据技术的不断发展 ...

  7. Java stream流式计算详解

    Java stream流式计算详解 1. Stream概述 1.1 Stream简介 1.2 Stream分类 2. Stream操作 2.1 Stream创建 2.2 Stream无状态操作 2.3 ...

  8. java8/Stream流式计算从入门到精通/函数式编程实战

    摘要:Stream流式计算,本文讲解了Stream流式计算的概念,具体的使用步骤以及源码实现,最后讲解了使用Stream过程中需要注意的事项.Stream在公司项目中被频繁使用,在性能优化上具有广泛的 ...

  9. Flink系列-1、流式计算简介

    版权声明:本文为博主原创文章,遵循 CC 4.0 BY-SA 版权协议,转载请附上原文出处链接和本声明. 大数据系列文章目录 官方网址:https://flink.apache.org/ 学习资料:h ...

最新文章

  1. winxp运行html代码,关于WinXP系统实现自动化运行的操作技巧
  2. 是我,一行代码三个 Bug!!! | 每日趣闻
  3. java动态添加组件_有关Java Swing动态增加组件
  4. list集合去重复元素
  5. 区块链面试过程中的40个问题
  6. 7.7-9 chage、chpasswd、su
  7. java sql server连接字符串_关于Java:SQL Server的等效jdbc连接字符串
  8. 超酷jQuery进度条加载动画集合
  9. C#中List的排序(Sort)
  10. MAC下go语言的安装和配置
  11. Toontrack Superior Drummer for Mac - 鼓音乐制作工具
  12. pp to write
  13. 如何下载邢台市卫星地图高清版大图
  14. 第二十三天 小丁再战链表
  15. Prometheus监控告警规则
  16. 【哈佛公开课】积极心理学笔记-06乐观主义(下)
  17. graphiz应用一例:欧洲上古和中世纪民族变迁
  18. Halo的Sakura主题
  19. oracle 时间条件 当天,oracle 查询当天数据的sql条件写法
  20. 考研必备100个网站

热门文章

  1. C语言 循环结构 —— while循环
  2. JSON和cJSON
  3. 信息安全 为“智慧城市”保驾护航
  4. 使用ajax方法实现form表单的提交(Ajax和from提交的区别)
  5. 整合Spring Cloud Sleuth
  6. 大数据平台ambari
  7. 零基础学习xlwings,看这篇文章就够了
  8. Arm架构之系统调用
  9. VS RTKLIB调试错误
  10. python图像降采样_OpenCV:十一、图像上采样和降采样