flink Table API 与SQL入门实战
流处理
和批处理
都可以用,是非常的方便!
导入依赖
<dependency><groupId>org.apache.flink</groupId><artifactId>flink-table_2.11</artifactId><version>1.7.0</version>
</dependency>
测试案例
import org.apache.flink.streaming.api.scala.{DataStream, StreamExecutionEnvironment}
import org.apache.flink.table.api.scala.{StreamTableEnvironment, table2TableConversions}
import org.apache.flink.table.api.{Table, TableEnvironment}object TableAPITest {def main(args: Array[String]): Unit = {//创建运行环境val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment//返回Scala流org.apache.flink.streaming.api.scala.StreamExecutionEnvironment的org.apache.flink.table.api.scala.StreamTableEnvironment。val tableenv: StreamTableEnvironment = TableEnvironment.getTableEnvironment(env)//读取文本文档val stream: DataStream[String] = env.readTextFile("a.txt")//导入隐式转换import org.apache.flink.api.scala._//处理从文本读取的数据,封装成User类型val stream1: DataStream[User] = stream.map(x => {val arr: Array[String] = x.split(",")User(arr(0).toInt, arr(1), arr(2), arr(3).toInt, arr(4).toLong)})//将给定的DataStream转换为Table。//Table的字段名称是自动从DataStream的类型派生的。//如果流中的数据类型是case class可以直接根据case class的结构生成tableval table: Table = tableenv.fromDataStream(stream1)//查询一下,比如,就查询id,和sex这两列val table1: Table = table.select("id,sex")//将给定的Table转换为指定类型的附加DataStream。//注意,泛型要写对,比如我们上面查询的id是Int类型,sex是String类型,那么toAppendStream这块的泛型要匹配val value1: DataStream[(Int, String)] = table1.toAppendStream[(Int, String)]//打印查询后的内容value1.print()env.execute()}
}case class User(id: Int, sex: String, name: String, age: Int, num: Long)
运行结果
5> (2,女)
13> (5,男)
15> (6,男)
8> (3,女)
3> (1,男)
10> (4,男)
用SQL风格查询
这种查询方式更简单些,举个例子,和上面的运行结果是一样的!
import org.apache.flink.streaming.api.scala.{DataStream, StreamExecutionEnvironment}
import org.apache.flink.table.api.scala.{StreamTableEnvironment, table2TableConversions}
import org.apache.flink.table.api.{Table, TableEnvironment}object SQLAPITest {def main(args: Array[String]): Unit = {//创建运行环境val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment//table的环境是包含在上面的执行环境中的,所以从中可以获取table的执行环境val tableenv: StreamTableEnvironment = TableEnvironment.getTableEnvironment(env)//读取文本文档val stream: DataStream[String] = env.readTextFile("a.txt")//导入隐式转换import org.apache.flink.api.scala._//处理从文本读取的数据,封装成User类型val stream1: DataStream[User] = stream.map(x => {val arr: Array[String] = x.split(",")User(arr(0).toInt, arr(1), arr(2), arr(3).toInt, arr(4).toLong)})val table: Table = tableenv.fromDataStream(stream1)//以sql语法的形式查询,没注册表,因为如果向下面这种调用方式的话,会自动注册val result: Table = tableenv.sqlQuery(s"select id,name FROM $table")//table转为stream之后方便打印输出val value: DataStream[(Int, String)] = result.toAppendStream[(Int, String)]//打印查询后的内容value.print()env.execute()}
}
或者也可以先注册表格,然后再查询
tableenv.registerTable("user",table)//user要带个反单引号,因为这个user是flink的关键字,//所以 ,如果表明是其他的名字,可能就不需要加
val result: Table = tableenv.sqlQuery("select id,name FROM `user`")
总结
- 开发
Table API
与SQL API
,是为了进一步降低开发难度
flink Table API 与SQL入门实战相关推荐
- Flink Table API和SQL(下)
传送门: Flink Table API和SQL(上)(基本API介绍+流处理表的特性) Flink Table API和SQL(中)(时间属性及窗口+聚合查询+联结查询) Flink Table A ...
- 第六课 大数据技术之Fink1.13的实战学习-Table Api和SQL
第六课 大数据技术之Fink1.13的实战学习-Table Api和SQL 文章目录 第六课 大数据技术之Fink1.13的实战学习-Table Api和SQL 第一节 Fink SQL快速上手 1. ...
- 2021年大数据Flink(三十):Flink Table API SQL 介绍
目录 Table API & SQL 介绍 为什么需要Table API & SQL Table API& SQL发展历程 架构升级 查询处理器的选 ...
- Flink学习笔记(十一)Table API 和 SQL
文章目录 11. Table API 和 SQL 11.1 快速上手 11.1.1 需要依赖 11.1.2 示例 11.2 基本 API 11.2.1 程序架构 11.2.2 创建表环境 11.2.3 ...
- Flink的Table API 与SQL的流处理
1 流处理与SQL的区别 Table API和SQL,本质上还是基于关系型表的操作方式:而关系型表.SQL本身,一般是有界的,更适合批处理的场景.所以在流处理的过程中,有一些特殊概念. SQL 流 ...
- Flink的Table API 与SQL介绍及调用
1 概述 DataSetAPI和DateStreamAPI是基于整个Flink的运行时环境做操作处理的,Table API和SQL是在DateStreamAPI上又包了一层.对于新版本的Blin ...
- Flink Table Api SQL 初体验,Blink的使用
概述 Flink具有Table API和SQL-用于统一流和批处理. Table API是用于Scala和Java的语言集成查询API,它允许以非常直观的方式组合来自关系运算符(例如选择,过滤和联接) ...
- (十八)Flink Table API SQL 编程指南 Table API 和Datastream API 集成
文章目录 DataStream 和 Table 之间的转换 依赖项和导入 配置 执行行为 datastream API table API 批处理运行时模式 Changelog统一 处理(仅插入)流 ...
- Flink Table API SQL编程指南(自定义Sources Sinks)
TableSource TableSource提供对存储在外部系统(数据库,键值存储,消息队列)或文件中的数据的访问的通用接口.在TableEnvironment中注册TableSource后,可以通 ...
最新文章
- oracle 授权 增删改查权限_linux suid,sgid,sticky-bit三种特殊权限简介
- java游戏怎么导入jme3,Java Camera.getProjectionMatrix方法代码示例
- myisam和innodb
- [渝粤教育] 中国地质大学 材料力学 复习题 (2)
- WebService生成客户端代理的工具WSDL参数介绍
- Linux学习 - awk使用
- 使用Python为中秋节绘制一块美味的月饼
- select自定义下拉选择图标
- 【二分图】【最大匹配】【匈牙利算法】CODEVS 2776 寻找代表元
- ×××的两种组网方式
- 从源码解析kube-scheduler默认的配置
- 数字图像处理编程入门笔记
- Springboot面试杀手锏-自动配置原理
- 【读书笔记】见识——吴军
- html5在哪编辑器,HTML5文本编辑器推荐-属于Web开发人员的HTML5编辑器
- QT教程,QT从入门到实战教程完整版
- 成为JAVA(高级)工程师,该学什么
- iOS 单元测试- 入门学习2
- Legolas工业自动化平台案例 —— 水源地自动化监控系统
- saber与matlab联合仿真
热门文章
- java 程序打包成jar_把Java程序打包成jar文件包并执行的方法
- 北京工业大学微型计算机接口技术考试,汇编语言微机原理及接口技术期末试卷含答案...
- 商标注册流程与注意事项 logo 商标注册类型分类解释
- linux shell脚本中 if 条件判断
- mybatis 不生效 参数_Spring Boot(七):你不能不知道的Mybatis缓存机制
- liunx 环境-配置docker阿里云镜像加速
- 原型模式 java 深浅_java学习笔记之原型模式及深浅拷贝
- 美丽的窗花java分形_“高冷奇葩”原来冰窗花可以这么美
- 发牌游戏 java_解析扑克牌游戏发牌算法——java实现
- access横向求和sum_数据横向、纵向及交叉求和,同事用Alt+=号一键搞定!不需要函数...