flink版本1.14

flinksql 来源于kafka json格式数据

变化的表

业务中sql可能不完全满足使用,需要转换成DataStream 更灵活一些,所以需要互相转换,发挥各自的优势。

         final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.enableCheckpointing(10000);env.setParallelism(1);final StreamTableEnvironment tEnv = StreamTableEnvironment.create(env);//两个来自kafka的表tEnv.executeSql(CreateTableSQL.kafkaTableInfo);tEnv.executeSql(CreateTableSQL.kafkaTablePerson);//join两个表Table tableResult = tEnv.sqlQuery(SelectSQL.selectPerMaxScore);tEnv.toDataStream(tableResult, Row.class);dataStream.flatMap(new FlatMapFunction<Row, Object>() {@Overridepublic void flatMap(Row value, Collector<Object> out) throws Exception {String s = value.getField(0) + ":" + value.getField(1);out.collect(s);}}).print();env.execute();

会发现报错

Table sink 'Unregistered_DataStream_Sink_1' doesn't support consuming update changes [...].

那是因为在table-to-stream中,数据在发生变化,因此需要用toChangelogStream来转换

修改成如下内容:

         final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.enableCheckpointing(10000);env.setParallelism(1);final StreamTableEnvironment tEnv = StreamTableEnvironment.create(env);//两个来自kafka的表tEnv.executeSql(CreateTableSQL.kafkaTableInfo);tEnv.executeSql(CreateTableSQL.kafkaTablePerson);//join两个表Table tableResult = tEnv.sqlQuery(SelectSQL.selectPerMaxScore);tEnv.toChangelogStream(tableResult).flatMap(new FlatMapFunction<Row, Object>() {@Overridepublic void flatMap(Row value, Collector<Object> out) throws Exception {String s = value.getField(0) + ":" + value.getField(1);out.collect(s);}}).print();env.execute();

在网上找了一圈,没有类似的文章,自己做个记录。
这里虽然也可以用createTemporaryView来把表转换成DataStream,但是这种方式的表是固定的,在实际应用的使用场景中,还是toChangelogStream的应用更广一些。

executeSql() 与 sqlQuery()

你可能会发现 有的地方用的是executeSql() 有的地方用的是 sqlQuery() ,这两者是什么不同呢,对此我特意去看了一下源码里的注释。

sqlQuery(String query)

针对sqlQuery()他是这么说的

评估对已注册表的 SQL 查询并返回一个 Table 对象,该对象描述了进一步转换的管道。查询引用的所有表和其他对象都必须在 TableEnvironment 中注册。例如,使用 createTemporaryView(String, Table)) 来引用 Table 对象或使用 createTemporarySystemFunction(String, Class) 来引用函数。
或者,当调用 Table.toString() 方法时,会自动注册 Table 对象,例如当它被嵌入到字符串中时。因此,SQL 查询可以直接内联(即匿名)引用 Table 对象,如下所示:
Table table = ...;
String tableName = table.toString();
// 表没有注册到表环境
tEnv.sqlQuery("SELECT * FROM " + tableName + " WHERE a > 12");
请注意,返回的 Table 是一个 API 对象,仅包含管道描述。它实际上对应于 SQL 术语中的视图。调用 Table.execute() 触发执行或直接使用 executeSql(String)。

executeSql(String statement)

执行给定的单个语句并返回执行结果。
语句可以是 DDL/DML/DQL/SHOW/DESCRIBE/EXPLAIN/USE。对于 DML 和 DQL,此方法在提交作业后返回 TableResult。对于 DDL 和 DCL 语句,操作完成后将返回 TableResult。
如果多个管道应将数据作为单个执行的一部分插入到一个或多个接收器表中,请使用 StatementSet(请参阅 createStatementSet())。
默认情况下,所有 DML 操作都是异步执行的。使用 TableResult.await() 或 TableResult.getJobClient() 来监控执行。设置 TableConfigOptions.TABLE_DML_SYNC 以始终同步执行。

需要强调的是,这两个是不能强行互相转换的,出现以下报错

java.lang.ClassCastException:org.apache.flink.table.api.internal.TableResultImpl cannot be cast to org.apache.flink.table.api.Table

还有一点, sqlQuery(String query)不能执行复杂的语句,会出现以下报错

Unsupported SQL query! sqlQuery() only accepts a single SQL query of type SELECT, UNION, INTERSECT, EXCEPT, VALUES, and ORDER_BY.

固定的表

如果需求只是一个固定的表可以通过这种下面的案例

     final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.enableCheckpointing(10000);env.setParallelism(1);final StreamTableEnvironment tEnv = StreamTableEnvironment.create(env);// connector doris的一个表 PerCountNametEnv.executeSql(CreateTableSQL.PerCountNDoris);Table perCountName = tEnv.from("PerCountName");// PerCountName表对应的实体类 PerCountName.classtEnv.toDataStream(perCountName, PerCountName.class).flatMap(new FlatMapFunction<PerCountName, Object>() {@Overridepublic void flatMap(PerCountName value, Collector<Object> out) throws Exception {String s = value.getName()+"->"+value.getNum();out.collect(s);}}).print();env.execute();

flinkSQL Table转DataStream相关推荐

  1. Flink Table 和 DataStream 转换

    文章目录 Flink Table 和 DataStream 转换 1. 表(Table) 转换为 流(DataStream) 1.1 处理(仅插入)流 1.1.1 fromDataStream()方法 ...

  2. (十八)Flink Table API SQL 编程指南 Table API 和Datastream API 集成

    文章目录 DataStream 和 Table 之间的转换 依赖项和导入 配置 执行行为 datastream API table API 批处理运行时模式 Changelog统一 处理(仅插入)流 ...

  3. 干货 | 五千字长文带你快速入门FlinkSQL

    本文已收录github:https://github.com/BigDataScholar/TheKingOfBigData,里面有大数据高频考点,Java一线大厂面试题资源,上百本免费电子书籍,作者 ...

  4. Flink的Table API 与SQL介绍及调用

    1 概述    DataSetAPI和DateStreamAPI是基于整个Flink的运行时环境做操作处理的,Table API和SQL是在DateStreamAPI上又包了一层.对于新版本的Blin ...

  5. Flink入门第十二课:DataStream api/Flink sql实现每隔5分钟统计最近一小时热门商品小案例

    用到的数据文件 用到的数据文件 链接:https://pan.baidu.com/s/1uCk-IF4wWVfUkuuTAKaD0w 提取码:2hmu 1.需求 & 数据 用户行为数据不断写入 ...

  6. AppendStream和RetractStream(没有弄完)

    Reference [1]Flink RetractStream示例及UDF函数实现 [2]flink实战--flinkSQL 追加模式与缩进模式(toRetractStream)的区别 [3]Fli ...

  7. 95-910-140-源码-FlinkSQL-FlinkSQL简介

    文章目录 1.世界 2.概述 3.FlinkSQL执行计划 4.Table与DataStream和DataSet API集成 5.**Scala的隐式转换** 6.将DataStream或DataSe ...

  8. Flink学习4-流式SQL

    Flink学习4-流式SQL Flink系列文章 更多Flink系列文章请点击Flink系列文章 更多大数据文章请点击大数据好文推荐 摘要 介绍Flink Table Sql API相关概念,还会提供 ...

  9. 蝉联 Apache 最活跃项目,Flink 社区是如何保持高速发展的?

    简介:2020 年是 Apache Flink 社区生态加速繁荣的一年. 本文由 Apache Flink 中文社区发起人,阿里云计算平台事业部实时计算与开放平台部门负责人王峰分享,主要介绍 Flin ...

最新文章

  1. java objectoutputstream怎么用_java序列化与ObjectOutputStream和ObjectInputStream的实例详解...
  2. notepad++打开一个某个工程目录
  3. 中石油训练赛 - High Load Database(二分+记忆化)
  4. 微型计算机技术及应用选择题,微机(微型计算机技术及应用)选择题及答案(最终版).docx...
  5. leetcode 446. Arithmetic Slices II - Subsequence | 446. 等差数列划分 II - 子序列(动态规划)
  6. Apache日志Shell分析
  7. 浅析基于 Serverless 的前后端一体化框架
  8. 50道基础的java面试题
  9. 安装flash player提示版本不是最新,无法安装
  10. 剑指Offer读书笔记(持续更新中)
  11. tomcat 配置文件 conf/server.xml 中的 appBase和docBase
  12. 中山大学计算机软件专业,【广州日报】中山大学在珠海校区新成立人工智能学院和软件工程学院...
  13. 计算机应用基础在线3,《计算机应用基础》第3阶段在线作业3.docx
  14. Spring学习之IOC容器(二)
  15. 【C++】Lambda 表达式详解
  16. Win32中设置窗体失去焦点,并重新获取焦点
  17. elasticsearch: 查询过滤某个字段值的长度
  18. 【数据库】Windows下如何安装MySql
  19. IDEA快捷键之“跳入接口实现类”
  20. vijos1404 遭遇战

热门文章

  1. XCTF 华为云专场 qemuzzz
  2. C语言编写一个函数,实现计算并返回一个整数的平方(或立方)
  3. AdB android 投屏 usb,QtScrcpy: Android实时投屏软件,此应用程序提供USB(或通过TCP/IP)连接的Android设备的显示和控制。它不需要任何root访问权限...
  4. 计算机系统处理机,处理机
  5. prent()和prents()的区别
  6. oracle ORA-28002:the password will expire within 7 days 解决方法
  7. python十字坐标轴绘制_matplotlib画十字坐标图
  8. Spring(eclipse)简要笔记
  9. linux shell编写脚本,执行命令同时操作多台主机
  10. single-shot detection(SSD)目标检测算法详解——(一看就懂系列!!!)