SQL部分学习

Table API的特点Table API和SQL都是Apache Flink中高等级的分析API,SQL所具备的特点Table API也都具有,如下:

声明式 - 用户只关心做什么,不用关心怎么做;
高性能 - 支持查询优化,可以获取最好的执行性能;
流批统一 - 相同的统计逻辑,既可以流模式运行,也可以批模式运行;
标准稳定 - 语义遵循SQL标准,语法语义明确,不易变动。

当然除了SQL的特性,因为Table API是在Flink中专门设计的,所以Table API还具有自身的特点:

表达方式的扩展性 - 在Flink中可以为Table API开发很多便捷性功能,如:Row.flatten(), map/flatMap 等
功能的扩展性 - 在Flink中可以为Table API扩展更多的功能,如:Iteration,flatAggregate 等新功能
编译检查 - Table API支持java和scala语言开发,支持IDE中进行编译检查。

Table API和SQL捆绑在flink-table Maven工件中。必须将以下依赖项添加到你的项目才能使用Table API和SQL:


```xml
<dependency><groupId>org.apache.flink</groupId><artifactId>flink-table_2.11</artifactId><version>1.6.1</version>
</dependency>

另外,你需要为Flink的Scala批处理或流式API添加依赖项。对于批量查询,您需要添加:```java
```xml
<dependency><groupId>org.apache.flink</groupId><artifactId>flink-scala_2.11</artifactId><version>1.6.1</version>
</dependency>
3.2. Table API和SQL程序的结构Table API一般与DataSet或者DataStream紧密关联,可以通过一个DataSet或DataStream创建出一个Table,再用类似于filter, join, 或者 select关系型转化操作来转化为一个新的Table对象。最后将一个Table对象转回一个DataSet或DataStream。从内部实现上来说,所有应用于Table的转化操作都变成一棵逻辑表操作树,在Table对象被转化回DataSet或者DataStream之后,转化器会将逻辑表操作树转化为对等的DataSet或者DataStream操作符。Flink的批处理和流处理的Table API和SQL程序遵循相同的模式;所以我们只需要使用一种来演示即可要想执行flink的SQL语句,首先需要获取SQL的执行环境:两种方式(batch和streaming):批处理:```java
val bEnv = ExecutionEnvironment.getExecutionEnvironment
// create a TableEnvironment for batch queries
val bTableEnv = TableEnvironment.getTableEnvironment(bEnv)

流处理:

val sEnv = StreamExecutionEnvironment.getExecutionEnvironment
// create a TableEnvironment for streaming queries
val sTableEnv = TableEnvironment.getTableEnvironment(sEnv)

通过getTableEnvironment可以获取TableEnviromment;这个TableEnviromment是Table API和SQL集成的核心概念。它负责:

在内部目录中注册一个表
注册外部目录
执行SQL查询
注册用户定义的(标量,表格或聚合)函数
转换DataStream或DataSet成Table
持有一个ExecutionEnvironment或一个参考StreamExecutionEnvironment

在内部目录中注册一个表

TableEnvironment维护一个按名称注册的表的目录。有两种类型的表格,输入表格和输出表格。输入表可以在Table API和SQL查询中引用并提供输入数据。输出表可用于将表API或SQL查询的结果发送到外部系统输入表可以从各种来源注册:

现有Table对象,通常是表API或SQL查询的结果。
TableSource,它访问外部数据,例如文件,数据库或消息传递系统。
DataStream或DataSet来自DataStream或DataSet程序。

输出表可以使用注册TableSink。

注册一个表

// get a TableEnvironment
val tableEnv = TableEnvironment.getTableEnvironment(env)
// register the Table projTable as table “projectedX”
tableEnv.registerTable(“projectedTable”, projTable)
// Table is the result of a simple projection query
val projTable: Table = tableEnv.scan("projectedTable ").select(…)

注册一个TableSource

TableSource提供对存储在诸如数据库(MySQL,HBase等),具有特定编码(CSV,Apache [Parquet,Avro,ORC],…)的文件的存储系统中的外部数据的访问或者消息传送系统(Apache Kafka,RabbitMQ,…)

// get a TableEnvironment
val tableEnv = TableEnvironment.getTableEnvironment(env)
// create a TableSource
val csvSource: TableSource = CsvTableSource.builder().path("./data/score.csv")...
// register the TableSource as table "CsvTable"
tableEnv.registerTableSource("CsvTable", csvSource)
注册一个TableSink

注册TableSink可用于将表API或SQL查询的结果发送到外部存储系统,如数据库,键值存储,消息队列或文件系统(使用不同的编码,例如CSV,Apache [Parquet ,Avro,ORC],…)

// get a TableEnvironment
val tableEnv = TableEnvironment.getTableEnvironment(env)
// create a TableSink
val csvSink: TableSink = new CsvTableSink("/path/to/file", ...)
// define the field names and types
val fieldNames: Array[String] = Array("a", "b", "c")
val fieldTypes: Array[TypeInformation[_]] = Array(Types.INT, Types.STRING, Types.LONG)
// register the TableSink as table "CsvSinkTable"
tableEnv.registerTableSink("CsvSinkTable", fieldNames, fieldTypes, csvSink)
Table和DataStream和DataSet的集成
package com.ccj.pxj.sql
import org.apache.flink.core.fs.FileSystem
import org.apache.flink.streaming.api.scala._
import org.apache.flink.table.api.TableEnvironment
import org.apache.flink.table.sinks.CsvTableSink
object DataSet_DataStreamToTable {case class Order1(id:Long,proudct:String,amount:Int)def main(args: Array[String]): Unit = {//1.  获取流处理环境val env = StreamExecutionEnvironment.getExecutionEnvironment//2.  获取TableEnvironmentval tableEnv= TableEnvironment.getTableEnvironment(env)//3.  加载本地集合val dataStream: DataStream[Order1] = env.fromCollection(List(Order1(1, "beer", 3),Order1(2, "diaper", 4), Order1(3, "ruber", 2)))//4.  根据数据注册表tableEnv.registerDataStream("s",dataStream)//5.  执行SQLval table = tableEnv.sqlQuery("select * from s")//6.  写入CSV文件中table.printSchema()table.writeToSink(new CsvTableSink("./data/score_sql.csv",",",1,FileSystem.WriteMode.OVERWRITE))//7.  执行任务env.execute()}
}
将Table转换为DataStream或DataSet

Table可以转换为DataStream或者DataSet,这样的话,自定义的DataStream或者DataSet程序就可以基于Table API或者SQL查询的结果来执行了。

当将一个Table转换为DataStream或者DataSet时,你需要指定生成的DataStream或者DataSet的数据类型,即需要转换表的行的数据类型,通常最方便的转换类型是Row,下面列表概述了不同选项的功能:

Row:字段通过位置映射、可以是任意数量字段,支持空值,非类型安全访问   
POJO:字段通过名称(POJO字段作为Table字段时,必须命名)映射,可以是任意数量字段,支持空值,类型安全访问   
Case Class:字段通过位置映射,不支持空值,类型安全访问   
Tuple:字段通过位置映射,不得多于22(Scala)或者25(Java)个字段,不支持空值,类型安全访问   
Atomic Type:Table必须有一个字段,不支持空值,类型安全访问。

将Table转换为DataStream

流式查询的结果Table会被动态地更新,即每个新的记录到达输入流时结果就会发生变化。因此,转换此动态查询的DataStream需要对表的更新进行编码。

有两种模式可以将 Table转换为DataStream:

1:Append Mode:这种模式只适用于当动态表仅由INSERT更改修改时,即仅附加,之前发送的结果不会被更新。

2:Retract Mode:始终都可以使用此模式,它使用一个boolean标识来编码INSERT和DELETE更改。

package com.ccj.pxj.sql
import org.apache.flink.streaming.api.scala.{DataStream, StreamExecutionEnvironment}
import org.apache.flink.table.api.{Table, TableEnvironment}
import org.apache.flink.table.api.scala.StreamTableEnvironment
import org.apache.flink.api.scala._
object TableTODataStream {def main(args: Array[String]): Unit = {//1. 获取流处理环境val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment//  2. 设置并行度env.setParallelism(1)//  3. 获取Table运行环境val tableEnv: StreamTableEnvironment = TableEnvironment.getTableEnvironment(env)//  4. 加载本地集合val dataStream: DataStream[(Long, Int, String)] = env.fromCollection(List((1L, 1, "Hello"),(2L, 2, "Hello"),(6L, 6, "Hello"),(7L, 7, "Hello World"),(8L, 8, "Hello World"),(20L, 20, "Hello World")))//  5. 转换DataStream为Tableval table: Table = tableEnv.fromDataStream(dataStream)//  6. 将table转换为DataStream----将一个表附加到流上Append Modeval appendDataStream: DataStream[(Long, Int, String)] = tableEnv.toAppendStream[(Long, Int, String)](table)//7. 将table转换为DataStream----Retract Mode true代表添加消息,false代表撤销消息val retractDataStream: DataStream[(Boolean, (Long, Int, String))] = tableEnv.toRetractStream[(Long, Int, String)](table)//8. 打印输出appendDataStream.print()println("----------------------")retractDataStream.print()//  9. 执行任务env.execute("pxj")}
}
将Table转换为DataSet
package com.ccj.pxj.sql
import org.apache.flink.api.scala._
import org.apache.flink.table.api.{Table, TableEnvironment}
import org.apache.flink.table.api.scala.BatchTableEnvironment
object TableTODataSet {def main(args: Array[String]): Unit = {//1. 获取批处理环境val env: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment//2. 设置并行度env.setParallelism(1)//3. 获取Table运行环境val tableEnv: BatchTableEnvironment = TableEnvironment.getTableEnvironment(env)//4. 加载本地集合val dataSet: DataSet[(Long, Int, String)] = env.fromCollection(List((1L, 1, "Hello"),(2L, 2, "Hello"),(3L, 3, "Hello"),(7L, 7, "Hello World"),(8L, 8, "Hello World"),(20L, 20, "Hello World")))//5. DataSet转换为Tableval table: Table = tableEnv.fromDataSet(dataSet)//6. table转换为dataSetval result: DataSet[(Long, Int, String)] = tableEnv.toDataSet[(Long, Int, String)](table)//7. 打印输出result.print()println("--------------------------------")println(table)println("---------------")table.printSchema()//env.execute()}
}
批处理案例1:
package com.ccj.pxj.sql
import org.apache.flink.api.scala._
import org.apache.flink.table.api.{Table, TableEnvironment}
import org.apache.flink.table.api.scala.BatchTableEnvironment
import org.apache.flink.types.Row
object BatchFlinkSqlDemo {//创建一个样例类Order用来映射数据(订单名、用户名、订单日期、订单金额)case class  Order(id:Int, userName:String, createTime:String, money:Double)def main(args: Array[String]): Unit = {//1. 获取一个批处理运行环境val env: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment//  2. 获取一个Table运行环境val tableEnv: BatchTableEnvironment = TableEnvironment.getTableEnvironment(env)//  3. 创建一个样例类`Order`用来映射数据(订单名、用户名、订单日期、订单金额)//4. 基于本地`Order`集合创建一个DataSet sourceval dataSet: DataSet[Order] = env.fromCollection(List(Order(1, "zhangsan", "2018-10-20 15:30", 358.5),Order(2, "zhangsan", "2018-10-20 16:30", 131.5),Order(3, "lisi", "2018-10-20 16:30", 127.5),Order(4, "lisi", "2018-10-20 16:30", 328.5),Order(5, "lisi", "2018-10-20 16:30", 432.5),Order(6, "zhaoliu", "2018-10-20 22:30", 451.0),Order(7, "zhaoliu", "2018-10-20 22:30", 362.0),Order(8, "zhaoliu", "2018-10-20 22:30", 364.0),Order(9, "zhaoliu", "2018-10-20 22:30", 341.0)))//5. 使用Table运行环境将DataSet注册为一张表tableEnv.registerDataSet("t",dataSet)//  6. 使用SQL语句来操作数据(统计用户消费订单的总金额、最大金额、最小金额、订单总数)val table: Table = tableEnv.sqlQuery("select userName,sum(money)" +",max(money)" +",min(money),count(1) from t group by userName")//7. 使用TableEnv.toDataSet将Table转换为DataSetval resultDataSet: DataSet[Row] = tableEnv.toDataSet[Row](table)//8. 打印测试table.printSchema()println("-----")resultDataSet.print()// env.execute("pxj")}
}
批处理案例2
package com.ccj.pxj.sql
import org.apache.flink.api.scala.ExecutionEnvironment
import org.apache.flink.core.fs.FileSystem
import org.apache.flink.table.api.{Table, TableEnvironment, Types}
import org.apache.flink.table.api.scala.BatchTableEnvironment
import org.apache.flink.table.sinks.CsvTableSink
import org.apache.flink.table.sources.CsvTableSource
object BatchTableDemo {def main(args: Array[String]): Unit = {//1. 获取批处理运行环境val env: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment//2. 获取Table运行环境val tableEnv: BatchTableEnvironment = TableEnvironment.getTableEnvironment(env)//3. 加载外部CSV文件val csvTableSource: CsvTableSource = CsvTableSource.builder().path("data/score.csv") //加载文件路径.field("id", Types.INT) // 列名,类型定义.field("name", Types.STRING).field("subjectId", Types.INT).field("score", Types.DOUBLE).fieldDelimiter(",") // 属性间分隔符.lineDelimiter("\n") // 换行符//      .ignoreFirstLine()              // 忽略第一行内容.ignoreParseErrors() // 忽略解析错误.build()//4. 将外部数据构建成表tableEnv.registerTableSource("t",csvTableSource)//5. 使用table方式查询数据val table: Table = tableEnv.scan("t").select("id,name,subjectId,score").filter("name='张三'")//6. 打印表结构table.printSchema()//7. 将数据落地到新的CSV文件中table.writeToSink(new CsvTableSink("./data/score_table.csv",",",1,FileSystem.WriteMode.OVERWRITE))// 8. 执行任务env.execute()}
}

流数据处理案例

import java.util.UUID
import java.util.concurrent.TimeUnitimport org.apache.flink.streaming.api.TimeCharacteristic
import org.apache.flink.streaming.api.functions.source.{RichSourceFunction, SourceFunction}
import org.apache.flink.streaming.api.scala.{DataStream, StreamExecutionEnvironment}
import org.apache.flink.table.api.{Table, TableEnvironment}
import org.apache.flink.table.api.scala.StreamTableEnvironment
import org.apache.flink.api.scala._
import org.apache.flink.streaming.api.functions.AssignerWithPeriodicWatermarks
import org.apache.flink.streaming.api.watermark.Watermark
import org.apache.flink.types.Rowimport scala.util.Randomobject StreamFlinkSqlDemo {//  4. 创建一个订单样例类`Order`,包含四个字段(订单ID、用户ID、订单金额、时间戳)case class Order(id: String, userId: Int, money: Int, createTime: Long)def main(args: Array[String]): Unit = {//  1. 获取流处理运行环境val env = StreamExecutionEnvironment.getExecutionEnvironment//    2. 获取Table运行环境val tableEnv: StreamTableEnvironment = TableEnvironment.getTableEnvironment(env)//    3. 设置处理时间为`EventTime`env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)//  5. 创建一个自定义数据源val orderDataStream: DataStream[Order] = env.addSource(new RichSourceFunction[Order] {override def run(ctx: SourceFunction.SourceContext[Order]): Unit = {//    - 使用for循环生成1000个订单for (i <- 0 until 1000) {//  - 随机生成订单ID(UUID)val id = UUID.randomUUID().toString//  - 随机生成用户ID(0-2)val userId = Random.nextInt(3)//  - 随机生成订单金额(0-100)val money = Random.nextInt(101)//  - 时间戳为当前系统时间val timestamp = System.currentTimeMillis()// 收集数据ctx.collect(Order(id, userId, money, timestamp))//  - 每隔1秒生成一个订单TimeUnit.SECONDS.sleep(1)}}override def cancel(): Unit = {}})//  6. 添加水印,允许延迟2秒val waterDataStream: DataStream[Order] = orderDataStream.assignTimestampsAndWatermarks(new AssignerWithPeriodicWatermarks[Order]() {var currentTimeStamp = 0L// 获取水印override def getCurrentWatermark: Watermark = {new Watermark(currentTimeStamp - 2000)}// 获取当前时间override def extractTimestamp(element: Order, previousElementTimestamp: Long): Long = {currentTimeStamp = Math.max(element.createTime, previousElementTimestamp)currentTimeStamp}})//  7. 导入`import org.apache.flink.table.api.scala._`隐式参数import org.apache.flink.table.api.scala._//    8. 使用`registerDataStream`注册表,并分别指定字段,还要指定rowtime字段tableEnv.registerDataStream("t_order",waterDataStream,'id, 'userId, 'money, 'createTime.rowtime)//  9. 编写SQL语句统计用户订单总数、最大金额、最小金额//  - 分组时要使用`tumble(时间列, interval '窗口时间' second)`来创建窗口val sql ="""|select| userId,| count(1) as totalCount,| max(money) as maxMoney,| min(money) as minMoney| from| t_order| group by| userId,| tumble(createTime, interval '5' second)""".stripMargin//  10. 使用`tableEnv.sqlQuery`执行sql语句val table: Table = tableEnv.sqlQuery(sql)//    11. 将SQL的执行结果转换成DataStream再打印出来tableEnv.toAppendStream[Row](table).print()//    12. 启动流处理程序env.execute("pxj")}}

作者:pxj
日期:2021-07-18

flink之SQL入门相关推荐

  1. 实时数仓入门训练营:实时计算 Flink 版 SQL 实践

    简介:<实时数仓入门训练营>由阿里云研究员王峰.阿里云高级产品专家刘一鸣等实时计算 Flink 版和 Hologres 的多名技术/产品一线专家齐上阵,合力搭建此次训练营的课程体系,精心打 ...

  2. Flink 最锋利的武器:Flink SQL 入门和实战

    学习路径:<2021年最新从零到大数据专家学习路径指南> 面      试:<2021年最新版大数据面试题全面开启更新> [注意]:Flink1.9版本后的Flink SQL使 ...

  3. 第一天:什么是Flink、WordCount入门、Flink安装、并行度

    1. 初识 Flink 在当前数据量激增的时代,各种业务场景都有大量的业务数据产生,对于这些不断产的数据应该如何进行有效的处理,成为当下大多数公司所面临的问题.目前比较流行的大数据处理引擎 Apach ...

  4. Flink:从入门到放弃

    文章目录 前言 一.Flink简介 1. Flink组件栈 2. Flink基石 3. Fink的应用场景 3.1 Event-driven Applications[事件驱动] 3.2 Data A ...

  5. 使用flink Table Sql api来构建批量和流式应用(3)Flink Sql 使用

    从flink的官方文档,我们知道flink的编程模型分为四层,sql层是最高层的api,Table api是中间层,DataStream/DataSet Api 是核心,stateful Stream ...

  6. 使用flink Table Sql api来构建批量和流式应用(2)Table API概述

    从flink的官方文档,我们知道flink的编程模型分为四层,sql层是最高层的api,Table api是中间层,DataStream/DataSet Api 是核心,stateful Stream ...

  7. Apache Flink 零基础入门【转】

    Apache Flink 零基础入门(一):基础概念解析 Apache Flink 零基础入门(二):DataStream API 编程 转载于:https://www.cnblogs.com/dav ...

  8. MySQL用户管理及SQL入门

    第1章 Mysql用户管理: 1.1 用户的定义: 用户名+主机域 mysql> select user,host,password from mysql.user; +------+----- ...

  9. sql数据黑马程序员——SQL入门

    最近研究sql数据,稍微总结一下,以后继续补充: ---------------------- ASP.Net+Android+IO开辟S..Net培训.等待与您交流! --------------- ...

最新文章

  1. python3 异步 asyncio aiohttp aiohttp-requests aiofiles 使用
  2. 财务大数据比赛有python吗-大数据工作内容有哪些?老男孩Python数据培训
  3. 按钮添加边框和边框色
  4. python实现文件上传和下载_[Python] socket实现TFTP上传和下载
  5. npm -S -D的区别
  6. 时隔6年,NASA再造仿人机器人,或将在太空工作,应对严苛环境
  7. 旗正规则引擎内存表出错的原因及解决方法
  8. 集成电路pad指的是什么_芯片、半导体、集成电路,你分清楚了吗?
  9. 华为P40渲染图再曝光:果然是年度真旗舰
  10. 翻译:Hystrix - How To Use
  11. Phonegap VS AppCan
  12. 图表控件MsChart使用demo
  13. Openstack 虚拟机通讯
  14. linux启动和grub修复
  15. C# 计算农历日期方法 2022
  16. HTC Vive榜单:盘点一周最受欢迎的VR应用
  17. 关爱女性健康的移动产品竞品分析报告
  18. 【Go Web学习笔记】第三章 Go与表单的操作
  19. Word排版——毕业论文专业排版3——编号+多级列表
  20. Excel应用{数据加工与公式函数}

热门文章

  1. Vue-视频加载(vue-video-player)支持.mp4.m3u8.flv.mov格式
  2. python 福利吧_福利吧自动签到脚本
  3. CTF靶场系列结——综合环境
  4. 意淫系列-2018美图春招笔试题
  5. 2017年5月许小年最新演讲:深圳人没房的,还是咬咬牙就买吧!
  6. IDEA导出apk文件
  7. 68 文本左右对齐
  8. FFmpeg再学习 -- 硬件加速编解码
  9. 【概念】操作手册和用户手册的区别及制作
  10. 海思3559开发常识储备:相关名词全解