Flink1.7.2 sql 批处理示例

源码

  • https://github.com/opensourceteams/flink-maven-scala

概述

  • 本文为Flink sql Dataset 示例
  • 主要操作包括:Scan / Select,as (table),as (column),limit,Where / Filter,between and (where),Sum,min,max,avg,
  1. (group by ),group by having,distinct,INNER JOIN,left join,right join,full outer join,union,unionAll,INTERSECT

in,EXCEPT,insert into

SELECT

Scan / Select

  • 功能描述: 查询一个表中的所有数据
  • scala 程序

package com.opensourceteams.module.bigdata.flink.example.sql.dataset.operations.scanimport org.apache.flink.api.scala.{ExecutionEnvironment, _}
import org.apache.flink.table.api.TableEnvironment
import org.apache.flink.table.api.scala._object Run {def main(args: Array[String]): Unit = {//得到批环境val env = ExecutionEnvironment.getExecutionEnvironmentval dataSet = env.fromElements(("小明",15,"男"),("小王",45,"男"),("小李",25,"女"),("小慧",35,"女"))//得到Table环境val tableEnv = TableEnvironment.getTableEnvironment(env)//注册tabletableEnv.registerDataSet("user1",dataSet,'name,'age,'sex)tableEnv.sqlQuery(s"select name,age FROM user1").first(100).print()/*** 输出结果** 小明,15* 小王,45* 小李,25* 小慧,35*/}}
  • 输出结果
小明,15
小王,45
小李,25
小慧,35

as (table)

  • 功能描述: 给表名取别称
  • scala 程序

package com.opensourceteams.module.bigdata.flink.example.sql.dataset.operations.scanimport org.apache.flink.api.scala.{ExecutionEnvironment, _}
import org.apache.flink.table.api.TableEnvironment
import org.apache.flink.table.api.scala._object Run {def main(args: Array[String]): Unit = {//得到批环境val env = ExecutionEnvironment.getExecutionEnvironmentval dataSet = env.fromElements(("小明",15,"男"),("小王",45,"男"),("小李",25,"女"),("小慧",35,"女"))//得到Table环境val tableEnv = TableEnvironment.getTableEnvironment(env)//注册tabletableEnv.registerDataSet("user1",dataSet,'name,'age,'sex)tableEnv.sqlQuery(s"select t1.name,t1.age FROM user1 as t1").first(100).print()/*** 输出结果** 小明,15* 小王,45* 小李,25* 小慧,35*/}}
  • 输出结果
小明,15
小王,45
小李,25
小慧,35

as (column)

  • 功能描述: 给表名取别称
  • scala 程序

package com.opensourceteams.module.bigdata.flink.example.sql.dataset.operations.scanimport org.apache.flink.api.scala.{ExecutionEnvironment, _}
import org.apache.flink.table.api.TableEnvironment
import org.apache.flink.table.api.scala._object Run {def main(args: Array[String]): Unit = {//得到批环境val env = ExecutionEnvironment.getExecutionEnvironmentval dataSet = env.fromElements(("小明",15,"男"),("小王",45,"男"),("小李",25,"女"),("小慧",35,"女"))//得到Table环境val tableEnv = TableEnvironment.getTableEnvironment(env)//注册tabletableEnv.registerDataSet("user1",dataSet,'name,'age,'sex)tableEnv.sqlQuery(s"select name a,age as b FROM user1 ").first(100).print()/*** 输出结果** 小明,15* 小王,45* 小李,25* 小慧,35*/}}
  • 输出结果
小明,15
小王,45
小李,25
小慧,35

limit

  • 功能描述:查询一个表的数据,只返回指定的前几行(争对并行度而言,所以并行度不一样,结果不一样)
  • scala 程序
package com.opensourceteams.mo`dule.bigdata.flink.example.sql.dataset.operations.limitimport org.apache.flink.api.scala.{ExecutionEnvironment, _}
import org.apache.flink.table.api.TableEnvironment
import org.apache.flink.table.api.scala._object Run {def main(args: Array[String]): Unit = {//得到批环境val env = ExecutionEnvironment.getExecutionEnvironmentenv.setParallelism(2)val dataSet = env.fromElements(("小明",15,"男"),("小王",45,"男"),("小李",25,"女"),("小慧",35,"女"))//得到Table环境val tableEnv = TableEnvironment.getTableEnvironment(env)//注册tabletableEnv.registerDataSet("user1",dataSet,'name,'age,'sex)/*** 先排序,按age的降序排序,输出前100位结果,注意是按同一个并行度中的数据进行排序,也就是同一个分区*/tableEnv.sqlQuery(s"select name,age FROM user1  ORDER BY age desc LIMIT 100  ").first(100).print()/*** 输出结果 并行度设置为2** 小明,15* 小王,45* 小慧,35* 小李,25*//*** 输出结果 并行度设置为1** 小王,45* 小慧,35* 小李,25* 小明,15*/}}
  • 输出结果
小明,15
小王,45
小慧,35
小李,25

Where / Filter

  • 功能描述:列加条件过滤表中的数据
  • scala 程序
package com.opensourceteams.module.bigdata.flink.example.sql.dataset.operations.whereimport org.apache.flink.api.scala.{ExecutionEnvironment, _}
import org.apache.flink.table.api.TableEnvironment
import org.apache.flink.table.api.scala._object Run {def main(args: Array[String]): Unit = {//得到批环境val env = ExecutionEnvironment.getExecutionEnvironmentval dataSet = env.fromElements(("小明",15,"男"),("小王",45,"男"),("小李",25,"女"),("小慧",35,"女"))//得到Table环境val tableEnv = TableEnvironment.getTableEnvironment(env)//注册tabletableEnv.registerDataSet("user1",dataSet,'name,'age,'sex)tableEnv.sqlQuery(s"select name,age,sex FROM user1 where sex = '女'").first(100).print()/*** 输出结果* * 小李,25,女* 小慧,35,女*/}}
  • 输出结果

小李,25,女
小慧,35,女

between and (where)

  • 功能描述: 过滤列中的数据, 开始数据 <= data <= 结束数据
  • scala 程序
package com.opensourceteams.module.bigdata.flink.example.sql.dataset.operations.whereBetweenAndimport org.apache.flink.api.scala.{ExecutionEnvironment, _}
import org.apache.flink.table.api.TableEnvironment
import org.apache.flink.table.api.scala._object Run {def main(args: Array[String]): Unit = {//得到批环境val env = ExecutionEnvironment.getExecutionEnvironmentval dataSet = env.fromElements(("小明",15,"男"),("小王",45,"男"),("小李",25,"女"),("小慧",35,"女"))//得到Table环境val tableEnv = TableEnvironment.getTableEnvironment(env)//注册tabletableEnv.registerDataSet("user1",dataSet,'name,'age,'sex)tableEnv.sqlQuery(s"select name,age,sex FROM user1 where age between 20 and  35").first(100).print()/*** 结果** 小李,25,女* 小慧,35,女*/}}
  • 输出结果
小李,25,女
小慧,35,女

Sum

  • 功能描述: 求和所有数据
  • scala 程序

package com.opensourceteams.module.bigdata.flink.example.sql.dataset.operations.aggregations.sumimport org.apache.flink.api.scala.{ExecutionEnvironment, _}
import org.apache.flink.table.api.TableEnvironment
import org.apache.flink.table.api.scala._object Run {def main(args: Array[String]): Unit = {//得到批环境val env = ExecutionEnvironment.getExecutionEnvironmentval dataSet = env.fromElements(("小明",15,"男",1500),("小王",45,"男",4000),("小李",25,"女",800),("小慧",35,"女",500))//得到Table环境val tableEnv = TableEnvironment.getTableEnvironment(env)//注册tabletableEnv.registerDataSet("user1",dataSet,'name,'age,'sex,'salary)//汇总所有数据tableEnv.sqlQuery(s"select sum(salary) FROM user1").first(100).print()/*** 输出结果** 6800*/}}
  • 输出结果
6800

max

  • 功能描述: 求最大值
  • scala 程序
package com.opensourceteams.module.bigdata.flink.example.sql.dataset.operations.aggregations.maximport org.apache.flink.api.scala.{ExecutionEnvironment, _}
import org.apache.flink.table.api.TableEnvironment
import org.apache.flink.table.api.scala._object Run {def main(args: Array[String]): Unit = {//得到批环境val env = ExecutionEnvironment.getExecutionEnvironmentval dataSet = env.fromElements(("小明",15,"男",1500),("小王",45,"男",4000),("小李",25,"女",800),("小慧",35,"女",500))//得到Table环境val tableEnv = TableEnvironment.getTableEnvironment(env)//注册tabletableEnv.registerDataSet("user1",dataSet,'name,'age,'sex,'salary)//汇总所有数据tableEnv.sqlQuery(s"select max(salary) FROM user1 ").first(100).print()/*** 输出结果** 4000*/}}
  • 输出结果
4000

min

  • 功能描述: 求最小值
  • scala 程序
package com.opensourceteams.module.bigdata.flink.example.sql.dataset.operations.aggregations.minimport org.apache.flink.api.scala.{ExecutionEnvironment, _}
import org.apache.flink.table.api.TableEnvironment
import org.apache.flink.table.api.scala._object Run {def main(args: Array[String]): Unit = {//得到批环境val env = ExecutionEnvironment.getExecutionEnvironmentval dataSet = env.fromElements(("小明",15,"男",1500),("小王",45,"男",4000),("小李",25,"女",800),("小慧",35,"女",500))//得到Table环境val tableEnv = TableEnvironment.getTableEnvironment(env)//注册tabletableEnv.registerDataSet("user1",dataSet,'name,'age,'sex,'salary)tableEnv.sqlQuery(s"select min(salary) FROM user1 ").first(100).print()/*** 输出结果** 500*/}}
  • 输出结果
500

sum (group by )

  • 功能描述: 按性别分组求和
  • scala 程序

package com.opensourceteams.module.bigdata.flink.example.sql.dataset.operations.aggregations.groupimport org.apache.flink.api.scala.{ExecutionEnvironment, _}
import org.apache.flink.table.api.TableEnvironment
import org.apache.flink.table.api.scala._object Run {def main(args: Array[String]): Unit = {//得到批环境val env = ExecutionEnvironment.getExecutionEnvironmentval dataSet = env.fromElements(("小明",15,"男",1500),("小王",45,"男",4000),("小李",25,"女",800),("小慧",35,"女",500))//得到Table环境val tableEnv = TableEnvironment.getTableEnvironment(env)//注册tabletableEnv.registerDataSet("user1",dataSet,'name,'age,'sex,'salary)//汇总所有数据tableEnv.sqlQuery(s"select sex,sum(salary) FROM user1 group by sex").first(100).print()/*** 输出结果* * 女,1300* 男,5500*/}}
  • 输出结果
女,1300
男,5500

group by having

  • 功能描述:
  • scala 程序

package com.opensourceteams.module.bigdata.flink.example.sql.dataset.operations.aggregations.group_havingimport org.apache.flink.api.scala.{ExecutionEnvironment, _}
import org.apache.flink.table.api.TableEnvironment
import org.apache.flink.table.api.scala._object Run {def main(args: Array[String]): Unit = {//得到批环境val env = ExecutionEnvironment.getExecutionEnvironmentval dataSet = env.fromElements(("小明",15,"男",1500),("小王",45,"男",4000),("小李",25,"女",800),("小慧",35,"女",500))//得到Table环境val tableEnv = TableEnvironment.getTableEnvironment(env)//注册tabletableEnv.registerDataSet("user1",dataSet,'name,'age,'sex,'salary)//分组统计,having是分组条件查询tableEnv.sqlQuery(s"select sex,sum(salary) FROM user1 group by sex having sum(salary) >1500").first(100).print()/*** 输出结果* * */}}
  • 输出结果
男,5500

distinct

  • 功能描述: 去重一列或多列
  • scala 程序
package com.opensourceteams.module.bigdata.flink.example.sql.dataset.operations.aggregations.distinctimport org.apache.flink.api.scala.{ExecutionEnvironment, _}
import org.apache.flink.table.api.TableEnvironment
import org.apache.flink.table.api.scala._object Run {def main(args: Array[String]): Unit = {val env = ExecutionEnvironment.getExecutionEnvironmentval dataSet = env.fromElements(("a",15,"male"),("a",45,"female"),("d",25,"male"),("c",35,"female"))val tableEnv = TableEnvironment.getTableEnvironment(env)tableEnv.registerDataSet("user1",dataSet,'name,'age,'sex)/*** 对数据去重*/tableEnv.sqlQuery("select distinct name  FROM user1   ").first(100).print()/*** 输出结果** a* c* d*/}}
  • 输出结果
a
c
d

join

INNER JOIN

  • 功能描述: 连接两个表,按指定的列,两列都存在值才输出
  • scala 程序
package com.opensourceteams.module.bigdata.flink.example.sql.dataset.operations.join.innerJoinimport org.apache.flink.api.scala.{ExecutionEnvironment, _}
import org.apache.flink.table.api.TableEnvironment
import org.apache.flink.table.api.scala._object Run {def main(args: Array[String]): Unit = {//得到批环境val env = ExecutionEnvironment.getExecutionEnvironmentval dataSet = env.fromElements((1,"小明",15,"男",1500),(2,"小王",45,"男",4000),(3,"小李",25,"女",800),(4,"小慧",35,"女",500))val dataSetGrade = env.fromElements((1,"语文",100),(2,"数学",80),(1,"外语",50) )//得到Table环境val tableEnv = TableEnvironment.getTableEnvironment(env)//注册tabletableEnv.registerDataSet("user",dataSet,'id,'name,'age,'sex,'salary)tableEnv.registerDataSet("grade",dataSetGrade,'userId,'name,'fraction)//内连接,两个表// tableEnv.sqlQuery("select * FROM `user`  INNER JOIN  grade on  `user`.id = grade.userId ")tableEnv.sqlQuery("select `user`.*,grade.name,grade.fraction FROM `user`  INNER JOIN  grade on  `user`.id = grade.userId ").first(100).print()/*** 输出结果* 2,小王,45,男,4000,数学,80* 1,小明,15,男,1500,语文,100* 1,小明,15,男,1500,外语,50*/}}
  • 输出结果
2,小王,45,男,4000,数学,80
1,小明,15,男,1500,语文,100
1,小明,15,男,1500,外语,50

left join

  • 功能描述:连接两个表,按指定的列,左表中存在值就一定输出,右表如果不存在,就显示为空
  • scala 程序

package com.opensourceteams.module.bigdata.flink.example.sql.dataset.operations.join.leftJoinimport org.apache.flink.api.scala.{ExecutionEnvironment, _}
import org.apache.flink.table.api.TableEnvironment
import org.apache.flink.table.api.scala._object Run {def main(args: Array[String]): Unit = {//得到批环境val env = ExecutionEnvironment.getExecutionEnvironmentval dataSet = env.fromElements((1,"小明",15,"男",1500),(2,"小王",45,"男",4000),(3,"小李",25,"女",800),(4,"小慧",35,"女",500))val dataSetGrade = env.fromElements((1,"语文",100),(2,"数学",80),(1,"外语",50) )//得到Table环境val tableEnv = TableEnvironment.getTableEnvironment(env)//注册tabletableEnv.registerDataSet("user",dataSet,'id,'name,'age,'sex,'salary)tableEnv.registerDataSet("grade",dataSetGrade,'userId,'name,'fraction)//左连接,拿左边的表中的每一行数据,去关联右边的数据,如果有相同的匹配数据,就都匹配出来,如果没有,就匹配一条,不过右边的数据为空tableEnv.sqlQuery("select `user`.*,grade.name,grade.fraction FROM `user`  LEFT JOIN  grade on  `user`.id = grade.userId ").first(100).print()/*** 输出结果** 1,小明,15,男,1500,语文,100* 1,小明,15,男,1500,外语,50* 2,小王,45,男,4000,数学,80* 4,小慧,35,女,500,null,null* 3,小李,25,女,800,null,null***/}}
  • 输出结果
1,小明,15,男,1500,语文,100
1,小明,15,男,1500,外语,50
2,小王,45,男,4000,数学,80
4,小慧,35,女,500,null,null
3,小李,25,女,800,null,null

right join

  • 功能描述:连接两个表,按指定的列,右表中存在值就一定输出,左表如果不存在,就显示为空
  • scala 程序
package com.opensourceteams.module.bigdata.flink.example.sql.dataset.operations.join.rightJoinimport org.apache.flink.api.scala.{ExecutionEnvironment, _}
import org.apache.flink.table.api.TableEnvironment
import org.apache.flink.table.api.scala._object Run {def main(args: Array[String]): Unit = {//得到批环境val env = ExecutionEnvironment.getExecutionEnvironmentval dataSet = env.fromElements((1,"小明",15,"男",1500),(2,"小王",45,"男",4000),(3,"小李",25,"女",800),(4,"小慧",35,"女",500))val dataSetGrade = env.fromElements((1,"语文",100),(2,"数学",80),(1,"外语",50),(10,"外语",90) )//得到Table环境val tableEnv = TableEnvironment.getTableEnvironment(env)//注册tabletableEnv.registerDataSet("user",dataSet,'id,'name,'age,'sex,'salary)tableEnv.registerDataSet("grade",dataSetGrade,'userId,'name,'fraction)//左连接,拿左边的表中的每一行数据,去关联右边的数据,如果有相同的匹配数据,就都匹配出来,如果没有,就匹配一条,不过右边的数据为空tableEnv.sqlQuery("select `user`.*,grade.name,grade.fraction FROM `user`  RIGHT JOIN  grade on  `user`.id = grade.userId ").first(100).print()/*** 输出结果** 1,小明,15,男,1500,外语,50* 1,小明,15,男,1500,语文,100* 2,小王,45,男,4000,数学,80* null,null,null,null,null,外语,90***/}}
  • 输出结果
1,小明,15,男,1500,外语,50
1,小明,15,男,1500,语文,100
2,小王,45,男,4000,数学,80
null,null,null,null,null,外语,90

full outer join

  • 功能描述: 连接两个表,按指定的列,只要有一表中存在值就一定输出,另一表如果不存在就显示为空
  • scala 程序

package com.opensourceteams.module.bigdata.flink.example.sql.dataset.operations.join.fullOuterJoinimport org.apache.flink.api.scala.{ExecutionEnvironment, _}
import org.apache.flink.table.api.TableEnvironment
import org.apache.flink.table.api.scala._object Run {def main(args: Array[String]): Unit = {//得到批环境val env = ExecutionEnvironment.getExecutionEnvironmentval dataSet = env.fromElements((1,"小明",15,"男",1500),(2,"小王",45,"男",4000),(3,"小李",25,"女",800),(4,"小慧",35,"女",500))val dataSetGrade = env.fromElements((1,"语文",100),(2,"数学",80),(1,"外语",50),(10,"外语",90) )//得到Table环境val tableEnv = TableEnvironment.getTableEnvironment(env)//注册tabletableEnv.registerDataSet("user",dataSet,'id,'name,'age,'sex,'salary)tableEnv.registerDataSet("grade",dataSetGrade,'userId,'name,'fraction)//左,右,全匹配所有数据tableEnv.sqlQuery("select `user`.*,grade.name,grade.fraction FROM `user` FULL OUTER JOIN  grade on  `user`.id = grade.userId ").first(100).print()/*** 输出结果*** 3,小李,25,女,800,null,null* 1,小明,15,男,1500,外语,50* 1,小明,15,男,1500,语文,100* 2,小王,45,男,4000,数学,80* 4,小慧,35,女,500,null,null* null,null,null,null,null,外语,90****/}}
  • 输出结果
3,小李,25,女,800,null,null
1,小明,15,男,1500,外语,50
1,小明,15,男,1500,语文,100
2,小王,45,男,4000,数学,80
4,小慧,35,女,500,null,null
null,null,null,null,null,外语,90

Set Operations

union

  • 功能描述: 连接两个表中的数据,会去重
  • scala 程序
package com.opensourceteams.module.bigdata.flink.example.sql.dataset.operations.setOperations.unionimport org.apache.flink.api.scala.{ExecutionEnvironment, _}
import org.apache.flink.table.api.TableEnvironment
import org.apache.flink.table.api.scala._object Run {def main(args: Array[String]): Unit = {//得到批环境val env = ExecutionEnvironment.getExecutionEnvironmentval dataSet = env.fromElements((1,"小明",15,"男",1500),(2,"小王",45,"男",4000),(3,"小李",25,"女",800),(4,"小慧",35,"女",500))val dataSet2 = env.fromElements((1,"小明",15,"男",1500),(2,"小王",45,"男",4000),(30,"小李",25,"女",800),(40,"小慧",35,"女",500))//得到Table环境val tableEnv = TableEnvironment.getTableEnvironment(env)//注册tabletableEnv.registerDataSet("user",dataSet,'id,'name,'age,'sex,'salary)tableEnv.registerDataSet("t2",dataSet2,'id,'name,'age,'sex,'salary)/***  union 连接两个表,会去重*/tableEnv.sqlQuery("select * from ("+"select t1.* FROM `user` as t1 ) " ++ " UNION "+ " ( select t2.* FROM t2 )").first(100).print()/*** 输出结果** 30,小李,25,女,800* 40,小慧,35,女,500* 2,小王,45,男,4000* 4,小慧,35,女,500* 3,小李,25,女,800* 1,小明,15,男,1500**/}}
  • 输出结果
30,小李,25,女,800
40,小慧,35,女,500
2,小王,45,男,4000
4,小慧,35,女,500
3,小李,25,女,800
1,小明,15,男,1500

unionAll

  • 功能描述: 连接两表中的数据,不会去重
  • scala 程序
package com.opensourceteams.module.bigdata.flink.example.sql.dataset.operations.setOperations.unionAllimport org.apache.flink.api.scala.{ExecutionEnvironment, _}
import org.apache.flink.table.api.TableEnvironment
import org.apache.flink.table.api.scala._object Run {def main(args: Array[String]): Unit = {//得到批环境val env = ExecutionEnvironment.getExecutionEnvironmentval dataSet = env.fromElements((1,"小明",15,"男",1500),(2,"小王",45,"男",4000),(3,"小李",25,"女",800),(4,"小慧",35,"女",500))val dataSet2 = env.fromElements((1,"小明",15,"男",1500),(2,"小王",45,"男",4000),(30,"小李",25,"女",800),(40,"小慧",35,"女",500))//得到Table环境val tableEnv = TableEnvironment.getTableEnvironment(env)//注册tabletableEnv.registerDataSet("user",dataSet,'id,'name,'age,'sex,'salary)tableEnv.registerDataSet("t2",dataSet2,'id,'name,'age,'sex,'salary)/***  union 连接两个表,不会去重*/tableEnv.sqlQuery("select * from ("+"select t1.* FROM `user` as t1 ) " ++ " UNION ALL "+ " ( select t2.* FROM t2 )").first(100).print()/*** 输出结果** 1,小明,15,男,1500* 2,小王,45,男,4000* 3,小李,25,女,800* 4,小慧,35,女,500* 1,小明,15,男,1500* 2,小王,45,男,4000* 30,小李,25,女,800* 40,小慧,35,女,500**/}}
  • 输出结果
1,小明,15,男,1500
2,小王,45,男,4000
3,小李,25,女,800
4,小慧,35,女,500
1,小明,15,男,1500
2,小王,45,男,4000
30,小李,25,女,800
40,小慧,35,女,500

INTERSECT

  • 功能描述: INTERSECT 连接两个表,找相同的数据(相交的数据,重叠的数据)
  • scala 程序

package com.opensourceteams.module.bigdata.flink.example.sql.dataset.operations.setOperations.intersectimport org.apache.flink.api.scala.{ExecutionEnvironment, _}
import org.apache.flink.table.api.TableEnvironment
import org.apache.flink.table.api.scala._object Run {def main(args: Array[String]): Unit = {//得到批环境val env = ExecutionEnvironment.getExecutionEnvironmentval dataSet = env.fromElements((1,"小明",15,"男",1500),(2,"小王",45,"男",4000),(3,"小李",25,"女",800),(4,"小慧",35,"女",500))val dataSet2 = env.fromElements((1,"小明",15,"男",1500),(2,"小王",45,"男",4000),(30,"小李",25,"女",800),(40,"小慧",35,"女",500))//得到Table环境val tableEnv = TableEnvironment.getTableEnvironment(env)//注册tabletableEnv.registerDataSet("user",dataSet,'id,'name,'age,'sex,'salary)tableEnv.registerDataSet("t2",dataSet2,'id,'name,'age,'sex,'salary)/***  INTERSECT 连接两个表,找相同的数据(相交的数据,重叠的数据)*/tableEnv.sqlQuery("select * from ("+"select t1.* FROM `user` as t1 ) " ++ " INTERSECT "+ " ( select t2.* FROM t2 )").first(100).print()/*** 输出结果** 1,小明,15,男,1500* 2,小王,45,男,4000**/}}
  • 输出结果
 1,小明,15,男,15002,小王,45,男,4000

in

  • 功能描述: 子查询
  • scala 程序
package com.opensourceteams.module.bigdata.flink.example.sql.dataset.operations.setOperations.inimport org.apache.flink.api.scala.{ExecutionEnvironment, _}
import org.apache.flink.table.api.TableEnvironment
import org.apache.flink.table.api.scala._object Run {def main(args: Array[String]): Unit = {//得到批环境val env = ExecutionEnvironment.getExecutionEnvironmentval dataSet = env.fromElements((1,"小明",15,"男",1500),(2,"小王",45,"男",4000),(3,"小李",25,"女",800),(4,"小慧",35,"女",500))val dataSet2 = env.fromElements((1,"小明",15,"男",1500),(2,"小王",45,"男",4000),(30,"小李",25,"女",800),(40,"小慧",35,"女",500))//得到Table环境val tableEnv = TableEnvironment.getTableEnvironment(env)//注册tabletableEnv.registerDataSet("user",dataSet,'id,'name,'age,'sex,'salary)tableEnv.registerDataSet("t2",dataSet2,'id,'name,'age,'sex,'salary)/***  in ,子查询*/tableEnv.sqlQuery("select t1.* FROM `user` t1  where t1.id in " +" (select t2.id from t2) ").first(100).print()/*** 输出结果** 1,小明,15,男,1500* 2,小王,45,男,4000**/}}
  • 输出结果
 1,小明,15,男,15002,小王,45,男,4000

EXCEPT

  • 功能描述: EXCEPT 连接两个表,找不相同的数据(不相交的数据,不重叠的数据)
  • scala 程序

package com.opensourceteams.module.bigdata.flink.example.sql.dataset.operations.setOperations.exceptimport org.apache.flink.api.scala.{ExecutionEnvironment, _}
import org.apache.flink.table.api.TableEnvironment
import org.apache.flink.table.api.scala._object Run {def main(args: Array[String]): Unit = {//得到批环境val env = ExecutionEnvironment.getExecutionEnvironmentval dataSet = env.fromElements((1,"小明",15,"男",1500),(2,"小王",45,"男",4000),(3,"小李",25,"女",800),(4,"小慧",35,"女",500))val dataSet2 = env.fromElements((1,"小明",15,"男",1500),(2,"小王",45,"男",4000),(30,"小李",25,"女",800),(40,"小慧",35,"女",500))//得到Table环境val tableEnv = TableEnvironment.getTableEnvironment(env)//注册tabletableEnv.registerDataSet("user",dataSet,'id,'name,'age,'sex,'salary)tableEnv.registerDataSet("t2",dataSet2,'id,'name,'age,'sex,'salary)/***  EXCEPT 连接两个表,找不相同的数据(不相交的数据,不重叠的数据)*/tableEnv.sqlQuery("select * from ("+"select t1.* FROM `user` as t1 ) " ++ " EXCEPT "+ " ( select t2.* FROM t2 )").first(100).print()/*** 输出结果** 3,小李,25,女,800* 4,小慧,35,女,500**/}}
  • 输出结果
 3,小李,25,女,8004,小慧,35,女,500

DML

insert into

  • 功能描述:将一个表中的数据(source),插入到 csv文件中(sink)
  • scala程序

package com.opensourceteams.module.bigdata.flink.example.sql.dataset.operations.insertimport org.apache.flink.api.scala.typeutils.Types
import org.apache.flink.api.scala.{ExecutionEnvironment, _}
import org.apache.flink.core.fs.FileSystem.WriteMode
import org.apache.flink.table.api.TableEnvironment
import org.apache.flink.table.api.scala._
import org.apache.flink.api.scala._
import org.apache.flink.table.sinks.CsvTableSink
import org.apache.flink.api.common.typeinfo.TypeInformationobject Run {def main(args: Array[String]): Unit = {//得到批环境val env = ExecutionEnvironment.getExecutionEnvironmentval dataSet = env.fromElements(("小明",15,"男"),("小王",45,"男"),("小李",25,"女"),("小慧",35,"女"))//得到Table环境val tableEnv = TableEnvironment.getTableEnvironment(env)//注册tabletableEnv.registerDataSet("user1",dataSet,'name,'age,'sex)// create a TableSinkval csvSink = new CsvTableSink("sink-data/csv/a.csv",",",1,WriteMode.OVERWRITE);val fieldNames = Array("name", "age", "sex")val fieldTypes: Array[TypeInformation[_]] = Array(Types.STRING, Types.INT, Types.STRING)tableEnv.registerTableSink("t2",fieldNames,fieldTypes,csvSink)tableEnv.sqlUpdate(s" insert into  t2 select name,age,sex FROM user1  ")env.execute()/*** 输出结果* a.csv** 小明,15,男* 小王,45,男* 小李,25,女* 小慧,35,女*/}}
  • 输出数据 a.csv
小明,15,男
小王,45,男
小李,25,女
小慧,35,女

Scan

  • 功能描述:
  • scala 程序
  • 输出结果

Flink1.7.2 sql 批处理示例相关推荐

  1. spring aop示例_Spring批处理示例

    spring aop示例 Welcome to Spring Batch Example. Spring Batch is a spring framework module for executio ...

  2. python创建数据库的sql语句_对python插入数据库和生成插入sql的示例讲解

    如下所示: #-*- encoding:utf-8 -*- import csv import sys,os import pymysql def read_csv(filename): ''' 读取 ...

  3. Hibernate Native SQL查询示例

    Hibernate Native SQL查询示例 欢迎使用Hibernate Native SQL Query示例教程.我们在前面的文章中研究了Hibernate查询语言和Hibernate Crit ...

  4. mysql注入式攻击_mybatis的sql中使用$会出现sql注入示例

    mybatis的sql中使用$会出现sql注入示例: 模拟简单登录场景: 页面代码: functionlogin(){//sql注入 var user ={ username :"'李雪雷3 ...

  5. sql子查询示例_学习SQL:SQL查询示例

    sql子查询示例 In the previous article we've practiced SQL, and today, we'll continue with a few more SQL ...

  6. sql join 示例_SQL CROSS JOIN与示例

    sql join 示例 In this article, we will learn the SQL CROSS JOIN concept and support our learnings with ...

  7. Hibernate本机SQL查询示例

    Welcome to the Hibernate Native SQL Query example tutorial. We looked into Hibernate Query Language ...

  8. SQL批处理与事务控制

    今天我想要分享的是关于数据库的批处理与事务的控制.批处理对于项目的实际应用有非常大的具体意义. 一.批处理部分 首先我们新建一个表: create table t3( id int primary k ...

  9. Flink SQL 批处理使用HOP详解

    Flink SQL 批处理使用HOP详解 介绍 Flink SQL中分组窗口函数中有一个HOP(time_attr, hop interval 滑动间隔, fixed duration 窗口时间),适 ...

最新文章

  1. 我知道今天是写总结的日子-所以买了一罐啤酒喝
  2. python编程自学能学会吗-python编程还能自学?怎么能学好? - 【大连东软睿道】
  3. 数据库复习之规范化理论应用(第八次上机内容)
  4. Live555研究之二Sleep实现
  5. tomcat GET 编码疑惑
  6. python 控制手机摄像头_python+open cv调用手机摄像头,保存文件
  7. Linux基础(8)--关于man命令
  8. redis---中文文档
  9. 如何使用GDAL进行AOI裁剪
  10. 微信宣布:被禁 8 年的限制解除了!
  11. svn commit 问题 POST of '***/!svn/me' 403 forbidden
  12. 基于模块化多电平换流器(MMC)的柔性直流输电系统simulink仿真模型开发
  13. 论文笔记(二)Region Proposal by Guided Anchoring(CVPR2019)
  14. Mac安装truffle框架时出现✓ Preparing to download box ✖ Downloading Unbox failed! Error: connect的解决方法
  15. Windows10 U盘无法格式化怎么办?U盘拒绝访问?
  16. 使用离线语音夜灯联动控制家里其他灯具和电器
  17. java输出字符串排列组合代码
  18. JAVA项目:Java实现飞扬的小鸟(Flappy Bird)
  19. 生信技能9 - 生物信息分析必须掌握的生物学基本概念(建议收藏)
  20. c#程序连接oracle失败问题

热门文章

  1. shell编程--case判断
  2. Web SCADA 电力接线图工控组态编辑器
  3. iOS更改AppIcon
  4. 国信优易首席科学家周涛:大数据的商业应用
  5. ajax实现自动刷新页面实例
  6. [iOS]开发者证书和描述文件的作用
  7. [K/3Cloud]关于数据库sa密码更改,管理中心登录不上的问题。
  8. 【Visual C++】游戏开发笔记十三 游戏输入消息处理(二) 鼠标消息处理
  9. iOS下JS与OC互相调用(四)--JavaScriptCore
  10. 程序猿生存指南-4 借钱风波