Flink1.7.2 sql 批处理示例
Flink1.7.2 sql 批处理示例
源码
- https://github.com/opensourceteams/flink-maven-scala
概述
- 本文为Flink sql Dataset 示例
- 主要操作包括:Scan / Select,as (table),as (column),limit,Where / Filter,between and (where),Sum,min,max,avg,
- (group by ),group by having,distinct,INNER JOIN,left join,right join,full outer join,union,unionAll,INTERSECT
in,EXCEPT,insert into
SELECT
Scan / Select
- 功能描述: 查询一个表中的所有数据
- scala 程序
package com.opensourceteams.module.bigdata.flink.example.sql.dataset.operations.scanimport org.apache.flink.api.scala.{ExecutionEnvironment, _}
import org.apache.flink.table.api.TableEnvironment
import org.apache.flink.table.api.scala._object Run {def main(args: Array[String]): Unit = {//得到批环境val env = ExecutionEnvironment.getExecutionEnvironmentval dataSet = env.fromElements(("小明",15,"男"),("小王",45,"男"),("小李",25,"女"),("小慧",35,"女"))//得到Table环境val tableEnv = TableEnvironment.getTableEnvironment(env)//注册tabletableEnv.registerDataSet("user1",dataSet,'name,'age,'sex)tableEnv.sqlQuery(s"select name,age FROM user1").first(100).print()/*** 输出结果** 小明,15* 小王,45* 小李,25* 小慧,35*/}}
- 输出结果
小明,15
小王,45
小李,25
小慧,35
as (table)
- 功能描述: 给表名取别称
- scala 程序
package com.opensourceteams.module.bigdata.flink.example.sql.dataset.operations.scanimport org.apache.flink.api.scala.{ExecutionEnvironment, _}
import org.apache.flink.table.api.TableEnvironment
import org.apache.flink.table.api.scala._object Run {def main(args: Array[String]): Unit = {//得到批环境val env = ExecutionEnvironment.getExecutionEnvironmentval dataSet = env.fromElements(("小明",15,"男"),("小王",45,"男"),("小李",25,"女"),("小慧",35,"女"))//得到Table环境val tableEnv = TableEnvironment.getTableEnvironment(env)//注册tabletableEnv.registerDataSet("user1",dataSet,'name,'age,'sex)tableEnv.sqlQuery(s"select t1.name,t1.age FROM user1 as t1").first(100).print()/*** 输出结果** 小明,15* 小王,45* 小李,25* 小慧,35*/}}
- 输出结果
小明,15
小王,45
小李,25
小慧,35
as (column)
- 功能描述: 给表名取别称
- scala 程序
package com.opensourceteams.module.bigdata.flink.example.sql.dataset.operations.scanimport org.apache.flink.api.scala.{ExecutionEnvironment, _}
import org.apache.flink.table.api.TableEnvironment
import org.apache.flink.table.api.scala._object Run {def main(args: Array[String]): Unit = {//得到批环境val env = ExecutionEnvironment.getExecutionEnvironmentval dataSet = env.fromElements(("小明",15,"男"),("小王",45,"男"),("小李",25,"女"),("小慧",35,"女"))//得到Table环境val tableEnv = TableEnvironment.getTableEnvironment(env)//注册tabletableEnv.registerDataSet("user1",dataSet,'name,'age,'sex)tableEnv.sqlQuery(s"select name a,age as b FROM user1 ").first(100).print()/*** 输出结果** 小明,15* 小王,45* 小李,25* 小慧,35*/}}
- 输出结果
小明,15
小王,45
小李,25
小慧,35
limit
- 功能描述:查询一个表的数据,只返回指定的前几行(争对并行度而言,所以并行度不一样,结果不一样)
- scala 程序
package com.opensourceteams.mo`dule.bigdata.flink.example.sql.dataset.operations.limitimport org.apache.flink.api.scala.{ExecutionEnvironment, _}
import org.apache.flink.table.api.TableEnvironment
import org.apache.flink.table.api.scala._object Run {def main(args: Array[String]): Unit = {//得到批环境val env = ExecutionEnvironment.getExecutionEnvironmentenv.setParallelism(2)val dataSet = env.fromElements(("小明",15,"男"),("小王",45,"男"),("小李",25,"女"),("小慧",35,"女"))//得到Table环境val tableEnv = TableEnvironment.getTableEnvironment(env)//注册tabletableEnv.registerDataSet("user1",dataSet,'name,'age,'sex)/*** 先排序,按age的降序排序,输出前100位结果,注意是按同一个并行度中的数据进行排序,也就是同一个分区*/tableEnv.sqlQuery(s"select name,age FROM user1 ORDER BY age desc LIMIT 100 ").first(100).print()/*** 输出结果 并行度设置为2** 小明,15* 小王,45* 小慧,35* 小李,25*//*** 输出结果 并行度设置为1** 小王,45* 小慧,35* 小李,25* 小明,15*/}}
- 输出结果
小明,15
小王,45
小慧,35
小李,25
Where / Filter
- 功能描述:列加条件过滤表中的数据
- scala 程序
package com.opensourceteams.module.bigdata.flink.example.sql.dataset.operations.whereimport org.apache.flink.api.scala.{ExecutionEnvironment, _}
import org.apache.flink.table.api.TableEnvironment
import org.apache.flink.table.api.scala._object Run {def main(args: Array[String]): Unit = {//得到批环境val env = ExecutionEnvironment.getExecutionEnvironmentval dataSet = env.fromElements(("小明",15,"男"),("小王",45,"男"),("小李",25,"女"),("小慧",35,"女"))//得到Table环境val tableEnv = TableEnvironment.getTableEnvironment(env)//注册tabletableEnv.registerDataSet("user1",dataSet,'name,'age,'sex)tableEnv.sqlQuery(s"select name,age,sex FROM user1 where sex = '女'").first(100).print()/*** 输出结果* * 小李,25,女* 小慧,35,女*/}}
- 输出结果
小李,25,女
小慧,35,女
between and (where)
- 功能描述: 过滤列中的数据, 开始数据 <= data <= 结束数据
- scala 程序
package com.opensourceteams.module.bigdata.flink.example.sql.dataset.operations.whereBetweenAndimport org.apache.flink.api.scala.{ExecutionEnvironment, _}
import org.apache.flink.table.api.TableEnvironment
import org.apache.flink.table.api.scala._object Run {def main(args: Array[String]): Unit = {//得到批环境val env = ExecutionEnvironment.getExecutionEnvironmentval dataSet = env.fromElements(("小明",15,"男"),("小王",45,"男"),("小李",25,"女"),("小慧",35,"女"))//得到Table环境val tableEnv = TableEnvironment.getTableEnvironment(env)//注册tabletableEnv.registerDataSet("user1",dataSet,'name,'age,'sex)tableEnv.sqlQuery(s"select name,age,sex FROM user1 where age between 20 and 35").first(100).print()/*** 结果** 小李,25,女* 小慧,35,女*/}}
- 输出结果
小李,25,女
小慧,35,女
Sum
- 功能描述: 求和所有数据
- scala 程序
package com.opensourceteams.module.bigdata.flink.example.sql.dataset.operations.aggregations.sumimport org.apache.flink.api.scala.{ExecutionEnvironment, _}
import org.apache.flink.table.api.TableEnvironment
import org.apache.flink.table.api.scala._object Run {def main(args: Array[String]): Unit = {//得到批环境val env = ExecutionEnvironment.getExecutionEnvironmentval dataSet = env.fromElements(("小明",15,"男",1500),("小王",45,"男",4000),("小李",25,"女",800),("小慧",35,"女",500))//得到Table环境val tableEnv = TableEnvironment.getTableEnvironment(env)//注册tabletableEnv.registerDataSet("user1",dataSet,'name,'age,'sex,'salary)//汇总所有数据tableEnv.sqlQuery(s"select sum(salary) FROM user1").first(100).print()/*** 输出结果** 6800*/}}
- 输出结果
6800
max
- 功能描述: 求最大值
- scala 程序
package com.opensourceteams.module.bigdata.flink.example.sql.dataset.operations.aggregations.maximport org.apache.flink.api.scala.{ExecutionEnvironment, _}
import org.apache.flink.table.api.TableEnvironment
import org.apache.flink.table.api.scala._object Run {def main(args: Array[String]): Unit = {//得到批环境val env = ExecutionEnvironment.getExecutionEnvironmentval dataSet = env.fromElements(("小明",15,"男",1500),("小王",45,"男",4000),("小李",25,"女",800),("小慧",35,"女",500))//得到Table环境val tableEnv = TableEnvironment.getTableEnvironment(env)//注册tabletableEnv.registerDataSet("user1",dataSet,'name,'age,'sex,'salary)//汇总所有数据tableEnv.sqlQuery(s"select max(salary) FROM user1 ").first(100).print()/*** 输出结果** 4000*/}}
- 输出结果
4000
min
- 功能描述: 求最小值
- scala 程序
package com.opensourceteams.module.bigdata.flink.example.sql.dataset.operations.aggregations.minimport org.apache.flink.api.scala.{ExecutionEnvironment, _}
import org.apache.flink.table.api.TableEnvironment
import org.apache.flink.table.api.scala._object Run {def main(args: Array[String]): Unit = {//得到批环境val env = ExecutionEnvironment.getExecutionEnvironmentval dataSet = env.fromElements(("小明",15,"男",1500),("小王",45,"男",4000),("小李",25,"女",800),("小慧",35,"女",500))//得到Table环境val tableEnv = TableEnvironment.getTableEnvironment(env)//注册tabletableEnv.registerDataSet("user1",dataSet,'name,'age,'sex,'salary)tableEnv.sqlQuery(s"select min(salary) FROM user1 ").first(100).print()/*** 输出结果** 500*/}}
- 输出结果
500
sum (group by )
- 功能描述: 按性别分组求和
- scala 程序
package com.opensourceteams.module.bigdata.flink.example.sql.dataset.operations.aggregations.groupimport org.apache.flink.api.scala.{ExecutionEnvironment, _}
import org.apache.flink.table.api.TableEnvironment
import org.apache.flink.table.api.scala._object Run {def main(args: Array[String]): Unit = {//得到批环境val env = ExecutionEnvironment.getExecutionEnvironmentval dataSet = env.fromElements(("小明",15,"男",1500),("小王",45,"男",4000),("小李",25,"女",800),("小慧",35,"女",500))//得到Table环境val tableEnv = TableEnvironment.getTableEnvironment(env)//注册tabletableEnv.registerDataSet("user1",dataSet,'name,'age,'sex,'salary)//汇总所有数据tableEnv.sqlQuery(s"select sex,sum(salary) FROM user1 group by sex").first(100).print()/*** 输出结果* * 女,1300* 男,5500*/}}
- 输出结果
女,1300
男,5500
group by having
- 功能描述:
- scala 程序
package com.opensourceteams.module.bigdata.flink.example.sql.dataset.operations.aggregations.group_havingimport org.apache.flink.api.scala.{ExecutionEnvironment, _}
import org.apache.flink.table.api.TableEnvironment
import org.apache.flink.table.api.scala._object Run {def main(args: Array[String]): Unit = {//得到批环境val env = ExecutionEnvironment.getExecutionEnvironmentval dataSet = env.fromElements(("小明",15,"男",1500),("小王",45,"男",4000),("小李",25,"女",800),("小慧",35,"女",500))//得到Table环境val tableEnv = TableEnvironment.getTableEnvironment(env)//注册tabletableEnv.registerDataSet("user1",dataSet,'name,'age,'sex,'salary)//分组统计,having是分组条件查询tableEnv.sqlQuery(s"select sex,sum(salary) FROM user1 group by sex having sum(salary) >1500").first(100).print()/*** 输出结果* * */}}
- 输出结果
男,5500
distinct
- 功能描述: 去重一列或多列
- scala 程序
package com.opensourceteams.module.bigdata.flink.example.sql.dataset.operations.aggregations.distinctimport org.apache.flink.api.scala.{ExecutionEnvironment, _}
import org.apache.flink.table.api.TableEnvironment
import org.apache.flink.table.api.scala._object Run {def main(args: Array[String]): Unit = {val env = ExecutionEnvironment.getExecutionEnvironmentval dataSet = env.fromElements(("a",15,"male"),("a",45,"female"),("d",25,"male"),("c",35,"female"))val tableEnv = TableEnvironment.getTableEnvironment(env)tableEnv.registerDataSet("user1",dataSet,'name,'age,'sex)/*** 对数据去重*/tableEnv.sqlQuery("select distinct name FROM user1 ").first(100).print()/*** 输出结果** a* c* d*/}}
- 输出结果
a
c
d
join
INNER JOIN
- 功能描述: 连接两个表,按指定的列,两列都存在值才输出
- scala 程序
package com.opensourceteams.module.bigdata.flink.example.sql.dataset.operations.join.innerJoinimport org.apache.flink.api.scala.{ExecutionEnvironment, _}
import org.apache.flink.table.api.TableEnvironment
import org.apache.flink.table.api.scala._object Run {def main(args: Array[String]): Unit = {//得到批环境val env = ExecutionEnvironment.getExecutionEnvironmentval dataSet = env.fromElements((1,"小明",15,"男",1500),(2,"小王",45,"男",4000),(3,"小李",25,"女",800),(4,"小慧",35,"女",500))val dataSetGrade = env.fromElements((1,"语文",100),(2,"数学",80),(1,"外语",50) )//得到Table环境val tableEnv = TableEnvironment.getTableEnvironment(env)//注册tabletableEnv.registerDataSet("user",dataSet,'id,'name,'age,'sex,'salary)tableEnv.registerDataSet("grade",dataSetGrade,'userId,'name,'fraction)//内连接,两个表// tableEnv.sqlQuery("select * FROM `user` INNER JOIN grade on `user`.id = grade.userId ")tableEnv.sqlQuery("select `user`.*,grade.name,grade.fraction FROM `user` INNER JOIN grade on `user`.id = grade.userId ").first(100).print()/*** 输出结果* 2,小王,45,男,4000,数学,80* 1,小明,15,男,1500,语文,100* 1,小明,15,男,1500,外语,50*/}}
- 输出结果
2,小王,45,男,4000,数学,80
1,小明,15,男,1500,语文,100
1,小明,15,男,1500,外语,50
left join
- 功能描述:连接两个表,按指定的列,左表中存在值就一定输出,右表如果不存在,就显示为空
- scala 程序
package com.opensourceteams.module.bigdata.flink.example.sql.dataset.operations.join.leftJoinimport org.apache.flink.api.scala.{ExecutionEnvironment, _}
import org.apache.flink.table.api.TableEnvironment
import org.apache.flink.table.api.scala._object Run {def main(args: Array[String]): Unit = {//得到批环境val env = ExecutionEnvironment.getExecutionEnvironmentval dataSet = env.fromElements((1,"小明",15,"男",1500),(2,"小王",45,"男",4000),(3,"小李",25,"女",800),(4,"小慧",35,"女",500))val dataSetGrade = env.fromElements((1,"语文",100),(2,"数学",80),(1,"外语",50) )//得到Table环境val tableEnv = TableEnvironment.getTableEnvironment(env)//注册tabletableEnv.registerDataSet("user",dataSet,'id,'name,'age,'sex,'salary)tableEnv.registerDataSet("grade",dataSetGrade,'userId,'name,'fraction)//左连接,拿左边的表中的每一行数据,去关联右边的数据,如果有相同的匹配数据,就都匹配出来,如果没有,就匹配一条,不过右边的数据为空tableEnv.sqlQuery("select `user`.*,grade.name,grade.fraction FROM `user` LEFT JOIN grade on `user`.id = grade.userId ").first(100).print()/*** 输出结果** 1,小明,15,男,1500,语文,100* 1,小明,15,男,1500,外语,50* 2,小王,45,男,4000,数学,80* 4,小慧,35,女,500,null,null* 3,小李,25,女,800,null,null***/}}
- 输出结果
1,小明,15,男,1500,语文,100
1,小明,15,男,1500,外语,50
2,小王,45,男,4000,数学,80
4,小慧,35,女,500,null,null
3,小李,25,女,800,null,null
right join
- 功能描述:连接两个表,按指定的列,右表中存在值就一定输出,左表如果不存在,就显示为空
- scala 程序
package com.opensourceteams.module.bigdata.flink.example.sql.dataset.operations.join.rightJoinimport org.apache.flink.api.scala.{ExecutionEnvironment, _}
import org.apache.flink.table.api.TableEnvironment
import org.apache.flink.table.api.scala._object Run {def main(args: Array[String]): Unit = {//得到批环境val env = ExecutionEnvironment.getExecutionEnvironmentval dataSet = env.fromElements((1,"小明",15,"男",1500),(2,"小王",45,"男",4000),(3,"小李",25,"女",800),(4,"小慧",35,"女",500))val dataSetGrade = env.fromElements((1,"语文",100),(2,"数学",80),(1,"外语",50),(10,"外语",90) )//得到Table环境val tableEnv = TableEnvironment.getTableEnvironment(env)//注册tabletableEnv.registerDataSet("user",dataSet,'id,'name,'age,'sex,'salary)tableEnv.registerDataSet("grade",dataSetGrade,'userId,'name,'fraction)//左连接,拿左边的表中的每一行数据,去关联右边的数据,如果有相同的匹配数据,就都匹配出来,如果没有,就匹配一条,不过右边的数据为空tableEnv.sqlQuery("select `user`.*,grade.name,grade.fraction FROM `user` RIGHT JOIN grade on `user`.id = grade.userId ").first(100).print()/*** 输出结果** 1,小明,15,男,1500,外语,50* 1,小明,15,男,1500,语文,100* 2,小王,45,男,4000,数学,80* null,null,null,null,null,外语,90***/}}
- 输出结果
1,小明,15,男,1500,外语,50
1,小明,15,男,1500,语文,100
2,小王,45,男,4000,数学,80
null,null,null,null,null,外语,90
full outer join
- 功能描述: 连接两个表,按指定的列,只要有一表中存在值就一定输出,另一表如果不存在就显示为空
- scala 程序
package com.opensourceteams.module.bigdata.flink.example.sql.dataset.operations.join.fullOuterJoinimport org.apache.flink.api.scala.{ExecutionEnvironment, _}
import org.apache.flink.table.api.TableEnvironment
import org.apache.flink.table.api.scala._object Run {def main(args: Array[String]): Unit = {//得到批环境val env = ExecutionEnvironment.getExecutionEnvironmentval dataSet = env.fromElements((1,"小明",15,"男",1500),(2,"小王",45,"男",4000),(3,"小李",25,"女",800),(4,"小慧",35,"女",500))val dataSetGrade = env.fromElements((1,"语文",100),(2,"数学",80),(1,"外语",50),(10,"外语",90) )//得到Table环境val tableEnv = TableEnvironment.getTableEnvironment(env)//注册tabletableEnv.registerDataSet("user",dataSet,'id,'name,'age,'sex,'salary)tableEnv.registerDataSet("grade",dataSetGrade,'userId,'name,'fraction)//左,右,全匹配所有数据tableEnv.sqlQuery("select `user`.*,grade.name,grade.fraction FROM `user` FULL OUTER JOIN grade on `user`.id = grade.userId ").first(100).print()/*** 输出结果*** 3,小李,25,女,800,null,null* 1,小明,15,男,1500,外语,50* 1,小明,15,男,1500,语文,100* 2,小王,45,男,4000,数学,80* 4,小慧,35,女,500,null,null* null,null,null,null,null,外语,90****/}}
- 输出结果
3,小李,25,女,800,null,null
1,小明,15,男,1500,外语,50
1,小明,15,男,1500,语文,100
2,小王,45,男,4000,数学,80
4,小慧,35,女,500,null,null
null,null,null,null,null,外语,90
Set Operations
union
- 功能描述: 连接两个表中的数据,会去重
- scala 程序
package com.opensourceteams.module.bigdata.flink.example.sql.dataset.operations.setOperations.unionimport org.apache.flink.api.scala.{ExecutionEnvironment, _}
import org.apache.flink.table.api.TableEnvironment
import org.apache.flink.table.api.scala._object Run {def main(args: Array[String]): Unit = {//得到批环境val env = ExecutionEnvironment.getExecutionEnvironmentval dataSet = env.fromElements((1,"小明",15,"男",1500),(2,"小王",45,"男",4000),(3,"小李",25,"女",800),(4,"小慧",35,"女",500))val dataSet2 = env.fromElements((1,"小明",15,"男",1500),(2,"小王",45,"男",4000),(30,"小李",25,"女",800),(40,"小慧",35,"女",500))//得到Table环境val tableEnv = TableEnvironment.getTableEnvironment(env)//注册tabletableEnv.registerDataSet("user",dataSet,'id,'name,'age,'sex,'salary)tableEnv.registerDataSet("t2",dataSet2,'id,'name,'age,'sex,'salary)/*** union 连接两个表,会去重*/tableEnv.sqlQuery("select * from ("+"select t1.* FROM `user` as t1 ) " ++ " UNION "+ " ( select t2.* FROM t2 )").first(100).print()/*** 输出结果** 30,小李,25,女,800* 40,小慧,35,女,500* 2,小王,45,男,4000* 4,小慧,35,女,500* 3,小李,25,女,800* 1,小明,15,男,1500**/}}
- 输出结果
30,小李,25,女,800
40,小慧,35,女,500
2,小王,45,男,4000
4,小慧,35,女,500
3,小李,25,女,800
1,小明,15,男,1500
unionAll
- 功能描述: 连接两表中的数据,不会去重
- scala 程序
package com.opensourceteams.module.bigdata.flink.example.sql.dataset.operations.setOperations.unionAllimport org.apache.flink.api.scala.{ExecutionEnvironment, _}
import org.apache.flink.table.api.TableEnvironment
import org.apache.flink.table.api.scala._object Run {def main(args: Array[String]): Unit = {//得到批环境val env = ExecutionEnvironment.getExecutionEnvironmentval dataSet = env.fromElements((1,"小明",15,"男",1500),(2,"小王",45,"男",4000),(3,"小李",25,"女",800),(4,"小慧",35,"女",500))val dataSet2 = env.fromElements((1,"小明",15,"男",1500),(2,"小王",45,"男",4000),(30,"小李",25,"女",800),(40,"小慧",35,"女",500))//得到Table环境val tableEnv = TableEnvironment.getTableEnvironment(env)//注册tabletableEnv.registerDataSet("user",dataSet,'id,'name,'age,'sex,'salary)tableEnv.registerDataSet("t2",dataSet2,'id,'name,'age,'sex,'salary)/*** union 连接两个表,不会去重*/tableEnv.sqlQuery("select * from ("+"select t1.* FROM `user` as t1 ) " ++ " UNION ALL "+ " ( select t2.* FROM t2 )").first(100).print()/*** 输出结果** 1,小明,15,男,1500* 2,小王,45,男,4000* 3,小李,25,女,800* 4,小慧,35,女,500* 1,小明,15,男,1500* 2,小王,45,男,4000* 30,小李,25,女,800* 40,小慧,35,女,500**/}}
- 输出结果
1,小明,15,男,1500
2,小王,45,男,4000
3,小李,25,女,800
4,小慧,35,女,500
1,小明,15,男,1500
2,小王,45,男,4000
30,小李,25,女,800
40,小慧,35,女,500
INTERSECT
- 功能描述: INTERSECT 连接两个表,找相同的数据(相交的数据,重叠的数据)
- scala 程序
package com.opensourceteams.module.bigdata.flink.example.sql.dataset.operations.setOperations.intersectimport org.apache.flink.api.scala.{ExecutionEnvironment, _}
import org.apache.flink.table.api.TableEnvironment
import org.apache.flink.table.api.scala._object Run {def main(args: Array[String]): Unit = {//得到批环境val env = ExecutionEnvironment.getExecutionEnvironmentval dataSet = env.fromElements((1,"小明",15,"男",1500),(2,"小王",45,"男",4000),(3,"小李",25,"女",800),(4,"小慧",35,"女",500))val dataSet2 = env.fromElements((1,"小明",15,"男",1500),(2,"小王",45,"男",4000),(30,"小李",25,"女",800),(40,"小慧",35,"女",500))//得到Table环境val tableEnv = TableEnvironment.getTableEnvironment(env)//注册tabletableEnv.registerDataSet("user",dataSet,'id,'name,'age,'sex,'salary)tableEnv.registerDataSet("t2",dataSet2,'id,'name,'age,'sex,'salary)/*** INTERSECT 连接两个表,找相同的数据(相交的数据,重叠的数据)*/tableEnv.sqlQuery("select * from ("+"select t1.* FROM `user` as t1 ) " ++ " INTERSECT "+ " ( select t2.* FROM t2 )").first(100).print()/*** 输出结果** 1,小明,15,男,1500* 2,小王,45,男,4000**/}}
- 输出结果
1,小明,15,男,15002,小王,45,男,4000
in
- 功能描述: 子查询
- scala 程序
package com.opensourceteams.module.bigdata.flink.example.sql.dataset.operations.setOperations.inimport org.apache.flink.api.scala.{ExecutionEnvironment, _}
import org.apache.flink.table.api.TableEnvironment
import org.apache.flink.table.api.scala._object Run {def main(args: Array[String]): Unit = {//得到批环境val env = ExecutionEnvironment.getExecutionEnvironmentval dataSet = env.fromElements((1,"小明",15,"男",1500),(2,"小王",45,"男",4000),(3,"小李",25,"女",800),(4,"小慧",35,"女",500))val dataSet2 = env.fromElements((1,"小明",15,"男",1500),(2,"小王",45,"男",4000),(30,"小李",25,"女",800),(40,"小慧",35,"女",500))//得到Table环境val tableEnv = TableEnvironment.getTableEnvironment(env)//注册tabletableEnv.registerDataSet("user",dataSet,'id,'name,'age,'sex,'salary)tableEnv.registerDataSet("t2",dataSet2,'id,'name,'age,'sex,'salary)/*** in ,子查询*/tableEnv.sqlQuery("select t1.* FROM `user` t1 where t1.id in " +" (select t2.id from t2) ").first(100).print()/*** 输出结果** 1,小明,15,男,1500* 2,小王,45,男,4000**/}}
- 输出结果
1,小明,15,男,15002,小王,45,男,4000
EXCEPT
- 功能描述: EXCEPT 连接两个表,找不相同的数据(不相交的数据,不重叠的数据)
- scala 程序
package com.opensourceteams.module.bigdata.flink.example.sql.dataset.operations.setOperations.exceptimport org.apache.flink.api.scala.{ExecutionEnvironment, _}
import org.apache.flink.table.api.TableEnvironment
import org.apache.flink.table.api.scala._object Run {def main(args: Array[String]): Unit = {//得到批环境val env = ExecutionEnvironment.getExecutionEnvironmentval dataSet = env.fromElements((1,"小明",15,"男",1500),(2,"小王",45,"男",4000),(3,"小李",25,"女",800),(4,"小慧",35,"女",500))val dataSet2 = env.fromElements((1,"小明",15,"男",1500),(2,"小王",45,"男",4000),(30,"小李",25,"女",800),(40,"小慧",35,"女",500))//得到Table环境val tableEnv = TableEnvironment.getTableEnvironment(env)//注册tabletableEnv.registerDataSet("user",dataSet,'id,'name,'age,'sex,'salary)tableEnv.registerDataSet("t2",dataSet2,'id,'name,'age,'sex,'salary)/*** EXCEPT 连接两个表,找不相同的数据(不相交的数据,不重叠的数据)*/tableEnv.sqlQuery("select * from ("+"select t1.* FROM `user` as t1 ) " ++ " EXCEPT "+ " ( select t2.* FROM t2 )").first(100).print()/*** 输出结果** 3,小李,25,女,800* 4,小慧,35,女,500**/}}
- 输出结果
3,小李,25,女,8004,小慧,35,女,500
DML
insert into
- 功能描述:将一个表中的数据(source),插入到 csv文件中(sink)
- scala程序
package com.opensourceteams.module.bigdata.flink.example.sql.dataset.operations.insertimport org.apache.flink.api.scala.typeutils.Types
import org.apache.flink.api.scala.{ExecutionEnvironment, _}
import org.apache.flink.core.fs.FileSystem.WriteMode
import org.apache.flink.table.api.TableEnvironment
import org.apache.flink.table.api.scala._
import org.apache.flink.api.scala._
import org.apache.flink.table.sinks.CsvTableSink
import org.apache.flink.api.common.typeinfo.TypeInformationobject Run {def main(args: Array[String]): Unit = {//得到批环境val env = ExecutionEnvironment.getExecutionEnvironmentval dataSet = env.fromElements(("小明",15,"男"),("小王",45,"男"),("小李",25,"女"),("小慧",35,"女"))//得到Table环境val tableEnv = TableEnvironment.getTableEnvironment(env)//注册tabletableEnv.registerDataSet("user1",dataSet,'name,'age,'sex)// create a TableSinkval csvSink = new CsvTableSink("sink-data/csv/a.csv",",",1,WriteMode.OVERWRITE);val fieldNames = Array("name", "age", "sex")val fieldTypes: Array[TypeInformation[_]] = Array(Types.STRING, Types.INT, Types.STRING)tableEnv.registerTableSink("t2",fieldNames,fieldTypes,csvSink)tableEnv.sqlUpdate(s" insert into t2 select name,age,sex FROM user1 ")env.execute()/*** 输出结果* a.csv** 小明,15,男* 小王,45,男* 小李,25,女* 小慧,35,女*/}}
- 输出数据 a.csv
小明,15,男
小王,45,男
小李,25,女
小慧,35,女
Scan
- 功能描述:
- scala 程序
- 输出结果
Flink1.7.2 sql 批处理示例相关推荐
- spring aop示例_Spring批处理示例
spring aop示例 Welcome to Spring Batch Example. Spring Batch is a spring framework module for executio ...
- python创建数据库的sql语句_对python插入数据库和生成插入sql的示例讲解
如下所示: #-*- encoding:utf-8 -*- import csv import sys,os import pymysql def read_csv(filename): ''' 读取 ...
- Hibernate Native SQL查询示例
Hibernate Native SQL查询示例 欢迎使用Hibernate Native SQL Query示例教程.我们在前面的文章中研究了Hibernate查询语言和Hibernate Crit ...
- mysql注入式攻击_mybatis的sql中使用$会出现sql注入示例
mybatis的sql中使用$会出现sql注入示例: 模拟简单登录场景: 页面代码: functionlogin(){//sql注入 var user ={ username :"'李雪雷3 ...
- sql子查询示例_学习SQL:SQL查询示例
sql子查询示例 In the previous article we've practiced SQL, and today, we'll continue with a few more SQL ...
- sql join 示例_SQL CROSS JOIN与示例
sql join 示例 In this article, we will learn the SQL CROSS JOIN concept and support our learnings with ...
- Hibernate本机SQL查询示例
Welcome to the Hibernate Native SQL Query example tutorial. We looked into Hibernate Query Language ...
- SQL批处理与事务控制
今天我想要分享的是关于数据库的批处理与事务的控制.批处理对于项目的实际应用有非常大的具体意义. 一.批处理部分 首先我们新建一个表: create table t3( id int primary k ...
- Flink SQL 批处理使用HOP详解
Flink SQL 批处理使用HOP详解 介绍 Flink SQL中分组窗口函数中有一个HOP(time_attr, hop interval 滑动间隔, fixed duration 窗口时间),适 ...
最新文章
- 我知道今天是写总结的日子-所以买了一罐啤酒喝
- python编程自学能学会吗-python编程还能自学?怎么能学好? - 【大连东软睿道】
- 数据库复习之规范化理论应用(第八次上机内容)
- Live555研究之二Sleep实现
- tomcat GET 编码疑惑
- python 控制手机摄像头_python+open cv调用手机摄像头,保存文件
- Linux基础(8)--关于man命令
- redis---中文文档
- 如何使用GDAL进行AOI裁剪
- 微信宣布:被禁 8 年的限制解除了!
- svn commit 问题 POST of '***/!svn/me' 403 forbidden
- 基于模块化多电平换流器(MMC)的柔性直流输电系统simulink仿真模型开发
- 论文笔记(二)Region Proposal by Guided Anchoring(CVPR2019)
- Mac安装truffle框架时出现✓ Preparing to download box ✖ Downloading Unbox failed! Error: connect的解决方法
- Windows10 U盘无法格式化怎么办?U盘拒绝访问?
- 使用离线语音夜灯联动控制家里其他灯具和电器
- java输出字符串排列组合代码
- JAVA项目:Java实现飞扬的小鸟(Flappy Bird)
- 生信技能9 - 生物信息分析必须掌握的生物学基本概念(建议收藏)
- c#程序连接oracle失败问题