Flink Table和SQL的表和视图、Connectors和timestamp数据类型
目录
- 1. 表和视图
- 2. Table API Connectors
- 2.1 filesystem、print、blackhole
- 3. timestamp和timestamp_ltz
1. 表和视图
表分为临时表和永久表,相同名称下,临时表的优先级比永久表高
永久表需要数据库保存元数据,例如Hive数据库
连接外部数据系统通常用createTemporaryTable,中间结果表通常用createTemporatyView,如下所示:
tEnv.createTemporaryTable("table_name", tableDescriptor)
tEnv.createTemporaryView("table_name", table)
2. Table API Connectors
2.1 filesystem、print、blackhole
添加pom.xml依赖
<dependency><groupId>org.apache.flink</groupId><artifactId>flink-csv</artifactId><version>1.14.3</version><scope>provided</scope></dependency><dependency><groupId>org.apache.hadoop</groupId><artifactId>hadoop-client</artifactId><version>3.3.1</version><scope>provided</scope></dependency>
程序如下:
import org.apache.flink.api.common.RuntimeExecutionMode
import org.apache.flink.streaming.api.functions.sink.DiscardingSink
import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
import org.apache.flink.table.api.bridge.scala.StreamTableEnvironment
import org.apache.flink.table.api.{DataTypes, FormatDescriptor, Schema, TableDescriptor, long2Literal, row, string2Literal}
import org.apache.flink.types.Row
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.FileSystem
import org.apache.hadoop.ipc.StandbyExceptionimport scala.util.control.Breaks.{break, breakable}object flink_test {// 获取Active HDFS Uridef getActiveHdfsUri() = {val hadoopConf = new Configuration()val hdfsUris = Array("hdfs://192.168.23.101:8020","hdfs://192.168.23.102:8020","hdfs://192.168.23.103:8020")var hdfsCli: FileSystem = nullvar hdfsCapacity: Long = -1Lvar activeHdfsUri: String = nullbreakable {for (hdfsUri <- hdfsUris) {hadoopConf.set("fs.defaultFS", hdfsUri)hdfsCli = FileSystem.get(hadoopConf)try {hdfsCapacity = hdfsCli.getStatus.getCapacityactiveHdfsUri = hdfsUribreak} catch {case hdfsException: StandbyException => {}}}}activeHdfsUri}def main(args: Array[String]): Unit = {val senv = StreamExecutionEnvironment.getExecutionEnvironmentsenv.setRuntimeMode(RuntimeExecutionMode.STREAMING)val tEnv = StreamTableEnvironment.create(senv)val hdfsFilePath = s"${getActiveHdfsUri()}/test/test.txt"// HDFS表val fileSystemTable = tEnv.from(TableDescriptor.forConnector("filesystem").schema(Schema.newBuilder().column("name", DataTypes.STRING()).column("amount", DataTypes.BIGINT()).build()).option("path", hdfsFilePath).format(FormatDescriptor.forFormat("csv").option("field-delimiter", ",").build()).build())tEnv.createTemporaryView("fileSystemTable", fileSystemTable)// print表tEnv.createTemporaryTable("printSink",TableDescriptor.forConnector("print").schema(Schema.newBuilder().column("name", DataTypes.STRING()).column("amount", DataTypes.BIGINT()).build()).build())// 读取HDFS表数据用print输出, 输出结果和转换成DataStream进行print一样fileSystemTable.executeInsert("printSink")// blackhole表tEnv.executeSql("create temporary table blackholeSink with ('connector' = 'blackhole') like printSink")// 读取HDFS表数据到blackholetEnv.executeSql("insert into blackholeSink select * from fileSystemTable")// 转换为DataStream, 输出到blackholeval fileSystemDatastream = tEnv.toDataStream(fileSystemTable)fileSystemDatastream.addSink(new DiscardingSink[Row]())senv.execute()}
}
执行结果如下:
6> +I[zhang_san, 30]
4> +I[li_si, 40]
3. timestamp和timestamp_ltz
- timestamp(p)
p指小数秒的精度,范围为0-9,默认是6
val table = tEnv.sqlQuery("select timestamp '1970-01-01 00:00:04.001'")table.execute().print()
输出如下:
+----+-------------------------+
| op | EXPR$0 |
+----+-------------------------+
| +I | 1970-01-01 00:00:04.001 |
+----+-------------------------+
- timestamp_ltz(p)
用于描述时间线上的绝对时间点, 使用long保存从epoch至今的毫秒数,使用int保存毫秒中的纳秒数
无法通过字符串来指定, 可以通过一个long类型的epoch时间来转化。在同一个时间点, 全世界所有的机器上执行System.currentTimeMillis()都会返回同样的值
tEnv.executeSql("create view t1 as select to_timestamp_ltz(4001, 3)")val table = tEnv.sqlQuery("select * from t1")table.execute().print()
输出如下:
+----+-------------------------+
| op | EXPR$0 |
+----+-------------------------+
| +I | 1970-01-01 08:00:04.001 |
+----+-------------------------+
- 各种当前时间函数
tEnv.executeSql("create view myView1 as select localtime, localtimestamp, current_date, current_time, current_timestamp, current_row_timestamp(), now(), proctime()")val table = tEnv.sqlQuery("select * from myView1")table.printSchema()table.execute().print()
输出如下:
(`localtime` TIME(0) NOT NULL,`localtimestamp` TIMESTAMP(3) NOT NULL,`current_date` DATE NOT NULL,`current_time` TIME(0) NOT NULL,`current_timestamp` TIMESTAMP_LTZ(3) NOT NULL,`EXPR$5` TIMESTAMP_LTZ(3) NOT NULL,`EXPR$6` TIMESTAMP_LTZ(3) NOT NULL,`EXPR$7` TIMESTAMP_LTZ(3) NOT NULL *PROCTIME*
)
+----+-----------+-------------------------+--------------+--------------+-------------------------+-------------------------+-------------------------+-------------------------+
| op | localtime | localtimestamp | current_date | current_time | current_timestamp | EXPR$5 | EXPR$6 | EXPR$7 |
+----+-----------+-------------------------+--------------+--------------+-------------------------+-------------------------+-------------------------+-------------------------+
| +I | 12:59:06 | 2022-02-07 12:59:06.859 | 2022-02-07 | 12:59:06 | 2022-02-07 12:59:06.859 | 2022-02-07 12:59:06.859 | 2022-02-07 12:59:06.859 | 2022-02-07 12:59:06.862 |
+----+-----------+-------------------------+--------------+--------------+-------------------------+-------------------------+-------------------------+-------------------------+
Flink Table和SQL的表和视图、Connectors和timestamp数据类型相关推荐
- Flink教程(16)- Flink Table与SQL
文章目录 01 引言 02 Table API & SQL 介绍 2.1 Flink Table模块 2.2 Table API & SQL特点 2.3 Table API& ...
- Flink Table Api SQL 初体验,Blink的使用
概述 Flink具有Table API和SQL-用于统一流和批处理. Table API是用于Scala和Java的语言集成查询API,它允许以非常直观的方式组合来自关系运算符(例如选择,过滤和联接) ...
- Flink教程(17)- Flink Table与SQL(案例与SQL算子)
文章目录 01 引言 02 Flink Table&SQL 案例 2.1 案例1(DataStream SQL统计) 2.2 案例2(DataStream Table&SQL统计) 2 ...
- 创建触发器报PL/SQL: ORA-00942: 表或视图不存在
创建触发器报PL/SQL: ORA-00942: 表或视图不存在,现在如下验证进行重现: 1.创建用户u1赋予dba权限 SQL> create user u1 identified by u1 ...
- Flink Table和SQL的基本API
文章目录 一个示例 程序架构 创建表环境 创建表 1.连接器 2.虚拟表 表的查询 1.执行SQL查询 2.调用Table API进行查询 3.两种API的结合使用 输出表 表和流的转换 1.将表转换 ...
- (十八)Flink Table API SQL 编程指南 Table API 和Datastream API 集成
文章目录 DataStream 和 Table 之间的转换 依赖项和导入 配置 执行行为 datastream API table API 批处理运行时模式 Changelog统一 处理(仅插入)流 ...
- 2021年大数据Flink(三十):Flink Table API SQL 介绍
目录 Table API & SQL 介绍 为什么需要Table API & SQL Table API& SQL发展历程 架构升级 查询处理器的选 ...
- Flink Table API SQL编程指南(自定义Sources Sinks)
TableSource TableSource提供对存储在外部系统(数据库,键值存储,消息队列)或文件中的数据的访问的通用接口.在TableEnvironment中注册TableSource后,可以通 ...
- 使用flink Table Sql api来构建批量和流式应用(3)Flink Sql 使用
从flink的官方文档,我们知道flink的编程模型分为四层,sql层是最高层的api,Table api是中间层,DataStream/DataSet Api 是核心,stateful Stream ...
最新文章
- 怎样更好地团队协作沟通?
- 表单之label标签
- pytorch 归一化 测试(BatchNorm2d)
- 夺命雷公狗---微信开发26----客服消息接口基础和推送视频
- 基于容器原理(docker、lxc、cells)的Android 双系统设计概要
- 微信小程序|开发实战篇之八-list列表组件及其子组件
- JAVA ThreadPoolExecutor线程池
- 改善代码设计 —— 处理概括关系(Dealing with Generalization)
- Linux之crontab(计划任务)
- ROS教程之在自己键盘上控制小海龟移动
- 最流行的三大数据建模工具
- 苹果画画软件_想在iPad 上画画,推荐用这些软件
- 抽象代数 04.02 群在集合上的作用
- 记录学习Android基础的心得07:硬件控制P2
- 用Python实现一个简易的“听歌识曲”demo(一)
- 小猫踢足球-第14届蓝桥杯STEMA测评Scratch真题精选
- python爬虫招聘网站(智联)
- 区块链热点!STO被政府严令禁止
- mysql查询所有姓王的信息_MySQL(4)— 数据查询
- 向量内积和夹角的关系