• 流处理批处理都可以用,是非常的方便!

导入依赖

<dependency><groupId>org.apache.flink</groupId><artifactId>flink-table_2.11</artifactId><version>1.7.0</version>
</dependency>

测试案例

import org.apache.flink.streaming.api.scala.{DataStream, StreamExecutionEnvironment}
import org.apache.flink.table.api.scala.{StreamTableEnvironment, table2TableConversions}
import org.apache.flink.table.api.{Table, TableEnvironment}object TableAPITest {def main(args: Array[String]): Unit = {//创建运行环境val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment//返回Scala流org.apache.flink.streaming.api.scala.StreamExecutionEnvironment的org.apache.flink.table.api.scala.StreamTableEnvironment。val tableenv: StreamTableEnvironment = TableEnvironment.getTableEnvironment(env)//读取文本文档val stream: DataStream[String] = env.readTextFile("a.txt")//导入隐式转换import org.apache.flink.api.scala._//处理从文本读取的数据,封装成User类型val stream1: DataStream[User] = stream.map(x => {val arr: Array[String] = x.split(",")User(arr(0).toInt, arr(1), arr(2), arr(3).toInt, arr(4).toLong)})//将给定的DataStream转换为Table。//Table的字段名称是自动从DataStream的类型派生的。//如果流中的数据类型是case class可以直接根据case class的结构生成tableval table: Table = tableenv.fromDataStream(stream1)//查询一下,比如,就查询id,和sex这两列val table1: Table = table.select("id,sex")//将给定的Table转换为指定类型的附加DataStream。//注意,泛型要写对,比如我们上面查询的id是Int类型,sex是String类型,那么toAppendStream这块的泛型要匹配val value1: DataStream[(Int, String)] = table1.toAppendStream[(Int, String)]//打印查询后的内容value1.print()env.execute()}
}case class User(id: Int, sex: String, name: String, age: Int, num: Long)

运行结果

5> (2,女)
13> (5,男)
15> (6,男)
8> (3,女)
3> (1,男)
10> (4,男)

用SQL风格查询

这种查询方式更简单些,举个例子,和上面的运行结果是一样的!

import org.apache.flink.streaming.api.scala.{DataStream, StreamExecutionEnvironment}
import org.apache.flink.table.api.scala.{StreamTableEnvironment, table2TableConversions}
import org.apache.flink.table.api.{Table, TableEnvironment}object SQLAPITest {def main(args: Array[String]): Unit = {//创建运行环境val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment//table的环境是包含在上面的执行环境中的,所以从中可以获取table的执行环境val tableenv: StreamTableEnvironment = TableEnvironment.getTableEnvironment(env)//读取文本文档val stream: DataStream[String] = env.readTextFile("a.txt")//导入隐式转换import org.apache.flink.api.scala._//处理从文本读取的数据,封装成User类型val stream1: DataStream[User] = stream.map(x => {val arr: Array[String] = x.split(",")User(arr(0).toInt, arr(1), arr(2), arr(3).toInt, arr(4).toLong)})val table: Table = tableenv.fromDataStream(stream1)//以sql语法的形式查询,没注册表,因为如果向下面这种调用方式的话,会自动注册val result: Table = tableenv.sqlQuery(s"select id,name FROM $table")//table转为stream之后方便打印输出val value: DataStream[(Int, String)] = result.toAppendStream[(Int, String)]//打印查询后的内容value.print()env.execute()}
}

或者也可以先注册表格,然后再查询

tableenv.registerTable("user",table)//user要带个反单引号,因为这个user是flink的关键字,//所以  ,如果表明是其他的名字,可能就不需要加
val result: Table = tableenv.sqlQuery("select id,name FROM `user`")

总结

  • 开发 Table APISQL API,是为了进一步降低开发难度

flink Table API 与SQL入门实战相关推荐

  1. Flink Table API和SQL(下)

    传送门: Flink Table API和SQL(上)(基本API介绍+流处理表的特性) Flink Table API和SQL(中)(时间属性及窗口+聚合查询+联结查询) Flink Table A ...

  2. 第六课 大数据技术之Fink1.13的实战学习-Table Api和SQL

    第六课 大数据技术之Fink1.13的实战学习-Table Api和SQL 文章目录 第六课 大数据技术之Fink1.13的实战学习-Table Api和SQL 第一节 Fink SQL快速上手 1. ...

  3. 2021年大数据Flink(三十):Flink ​​​​​​​Table API  SQL 介绍

    目录 ​​​​​​​Table API & SQL 介绍 为什么需要Table API & SQL ​​​​​​​Table API& SQL发展历程 架构升级 查询处理器的选 ...

  4. Flink学习笔记(十一)Table API 和 SQL

    文章目录 11. Table API 和 SQL 11.1 快速上手 11.1.1 需要依赖 11.1.2 示例 11.2 基本 API 11.2.1 程序架构 11.2.2 创建表环境 11.2.3 ...

  5. Flink的Table API 与SQL的流处理

    1 流处理与SQL的区别   Table API和SQL,本质上还是基于关系型表的操作方式:而关系型表.SQL本身,一般是有界的,更适合批处理的场景.所以在流处理的过程中,有一些特殊概念. SQL 流 ...

  6. Flink的Table API 与SQL介绍及调用

    1 概述    DataSetAPI和DateStreamAPI是基于整个Flink的运行时环境做操作处理的,Table API和SQL是在DateStreamAPI上又包了一层.对于新版本的Blin ...

  7. Flink Table Api SQL 初体验,Blink的使用

    概述 Flink具有Table API和SQL-用于统一流和批处理. Table API是用于Scala和Java的语言集成查询API,它允许以非常直观的方式组合来自关系运算符(例如选择,过滤和联接) ...

  8. (十八)Flink Table API SQL 编程指南 Table API 和Datastream API 集成

    文章目录 DataStream 和 Table 之间的转换 依赖项和导入 配置 执行行为 datastream API table API 批处理运行时模式 Changelog统一 处理(仅插入)流 ...

  9. Flink Table API SQL编程指南(自定义Sources Sinks)

    TableSource TableSource提供对存储在外部系统(数据库,键值存储,消息队列)或文件中的数据的访问的通用接口.在TableEnvironment中注册TableSource后,可以通 ...

最新文章

  1. oracle 授权 增删改查权限_linux suid,sgid,sticky-bit三种特殊权限简介
  2. java游戏怎么导入jme3,Java Camera.getProjectionMatrix方法代码示例
  3. myisam和innodb
  4. [渝粤教育] 中国地质大学 材料力学 复习题 (2)
  5. WebService生成客户端代理的工具WSDL参数介绍
  6. Linux学习 - awk使用
  7. 使用Python为中秋节绘制一块美味的月饼
  8. select自定义下拉选择图标
  9. 【二分图】【最大匹配】【匈牙利算法】CODEVS 2776 寻找代表元
  10. ×××的两种组网方式
  11. 从源码解析kube-scheduler默认的配置
  12. 数字图像处理编程入门笔记
  13. Springboot面试杀手锏-自动配置原理
  14. 【读书笔记】见识——吴军
  15. html5在哪编辑器,HTML5文本编辑器推荐-属于Web开发人员的HTML5编辑器
  16. QT教程,QT从入门到实战教程完整版
  17. 成为JAVA(高级)工程师,该学什么
  18. iOS 单元测试- 入门学习2
  19. Legolas工业自动化平台案例 —— 水源地自动化监控系统
  20. saber与matlab联合仿真

热门文章

  1. java 程序打包成jar_把Java程序打包成jar文件包并执行的方法
  2. 北京工业大学微型计算机接口技术考试,汇编语言微机原理及接口技术期末试卷含答案...
  3. 商标注册流程与注意事项 logo 商标注册类型分类解释
  4. linux shell脚本中 if 条件判断
  5. mybatis 不生效 参数_Spring Boot(七):你不能不知道的Mybatis缓存机制
  6. liunx 环境-配置docker阿里云镜像加速
  7. 原型模式 java 深浅_java学习笔记之原型模式及深浅拷贝
  8. 美丽的窗花java分形_“高冷奇葩”原来冰窗花可以这么美
  9. 发牌游戏 java_解析扑克牌游戏发牌算法——java实现
  10. access横向求和sum_数据横向、纵向及交叉求和,同事用Alt+=号一键搞定!不需要函数...