目录

  • 1. 表和视图
  • 2. Table API Connectors
    • 2.1 filesystem、print、blackhole
  • 3. timestamp和timestamp_ltz

1. 表和视图

表分为临时表和永久表,相同名称下,临时表的优先级比永久表高
永久表需要数据库保存元数据,例如Hive数据库

连接外部数据系统通常用createTemporaryTable,中间结果表通常用createTemporatyView,如下所示:

tEnv.createTemporaryTable("table_name", tableDescriptor)
tEnv.createTemporaryView("table_name", table)

2. Table API Connectors

2.1 filesystem、print、blackhole

添加pom.xml依赖

        <dependency><groupId>org.apache.flink</groupId><artifactId>flink-csv</artifactId><version>1.14.3</version><scope>provided</scope></dependency><dependency><groupId>org.apache.hadoop</groupId><artifactId>hadoop-client</artifactId><version>3.3.1</version><scope>provided</scope></dependency>

程序如下:

import org.apache.flink.api.common.RuntimeExecutionMode
import org.apache.flink.streaming.api.functions.sink.DiscardingSink
import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
import org.apache.flink.table.api.bridge.scala.StreamTableEnvironment
import org.apache.flink.table.api.{DataTypes, FormatDescriptor, Schema, TableDescriptor, long2Literal, row, string2Literal}
import org.apache.flink.types.Row
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.FileSystem
import org.apache.hadoop.ipc.StandbyExceptionimport scala.util.control.Breaks.{break, breakable}object flink_test {// 获取Active HDFS Uridef getActiveHdfsUri() = {val hadoopConf = new Configuration()val hdfsUris = Array("hdfs://192.168.23.101:8020","hdfs://192.168.23.102:8020","hdfs://192.168.23.103:8020")var hdfsCli: FileSystem = nullvar hdfsCapacity: Long = -1Lvar activeHdfsUri: String = nullbreakable {for (hdfsUri <- hdfsUris) {hadoopConf.set("fs.defaultFS", hdfsUri)hdfsCli = FileSystem.get(hadoopConf)try {hdfsCapacity = hdfsCli.getStatus.getCapacityactiveHdfsUri = hdfsUribreak} catch {case hdfsException: StandbyException => {}}}}activeHdfsUri}def main(args: Array[String]): Unit = {val senv = StreamExecutionEnvironment.getExecutionEnvironmentsenv.setRuntimeMode(RuntimeExecutionMode.STREAMING)val tEnv = StreamTableEnvironment.create(senv)val hdfsFilePath = s"${getActiveHdfsUri()}/test/test.txt"// HDFS表val fileSystemTable = tEnv.from(TableDescriptor.forConnector("filesystem").schema(Schema.newBuilder().column("name", DataTypes.STRING()).column("amount", DataTypes.BIGINT()).build()).option("path", hdfsFilePath).format(FormatDescriptor.forFormat("csv").option("field-delimiter", ",").build()).build())tEnv.createTemporaryView("fileSystemTable", fileSystemTable)// print表tEnv.createTemporaryTable("printSink",TableDescriptor.forConnector("print").schema(Schema.newBuilder().column("name", DataTypes.STRING()).column("amount", DataTypes.BIGINT()).build()).build())// 读取HDFS表数据用print输出, 输出结果和转换成DataStream进行print一样fileSystemTable.executeInsert("printSink")// blackhole表tEnv.executeSql("create temporary table blackholeSink with ('connector' = 'blackhole') like printSink")// 读取HDFS表数据到blackholetEnv.executeSql("insert into blackholeSink select * from fileSystemTable")// 转换为DataStream, 输出到blackholeval fileSystemDatastream = tEnv.toDataStream(fileSystemTable)fileSystemDatastream.addSink(new DiscardingSink[Row]())senv.execute()}
}

执行结果如下:

6> +I[zhang_san, 30]
4> +I[li_si, 40]

3. timestamp和timestamp_ltz

  1. timestamp(p)
    p指小数秒的精度,范围为0-9,默认是6
    val table = tEnv.sqlQuery("select timestamp '1970-01-01 00:00:04.001'")table.execute().print()

输出如下:

+----+-------------------------+
| op |                  EXPR$0 |
+----+-------------------------+
| +I | 1970-01-01 00:00:04.001 |
+----+-------------------------+
  1. timestamp_ltz(p)
    用于描述时间线上的绝对时间点, 使用long保存从epoch至今的毫秒数,使用int保存毫秒中的纳秒数
    无法通过字符串来指定, 可以通过一个long类型的epoch时间来转化。在同一个时间点, 全世界所有的机器上执行System.currentTimeMillis()都会返回同样的值
    tEnv.executeSql("create view t1 as select to_timestamp_ltz(4001, 3)")val table = tEnv.sqlQuery("select * from t1")table.execute().print()

输出如下:

+----+-------------------------+
| op |                  EXPR$0 |
+----+-------------------------+
| +I | 1970-01-01 08:00:04.001 |
+----+-------------------------+
  1. 各种当前时间函数
    tEnv.executeSql("create view myView1 as select localtime, localtimestamp, current_date, current_time, current_timestamp, current_row_timestamp(), now(), proctime()")val table = tEnv.sqlQuery("select * from myView1")table.printSchema()table.execute().print()

输出如下:

(`localtime` TIME(0) NOT NULL,`localtimestamp` TIMESTAMP(3) NOT NULL,`current_date` DATE NOT NULL,`current_time` TIME(0) NOT NULL,`current_timestamp` TIMESTAMP_LTZ(3) NOT NULL,`EXPR$5` TIMESTAMP_LTZ(3) NOT NULL,`EXPR$6` TIMESTAMP_LTZ(3) NOT NULL,`EXPR$7` TIMESTAMP_LTZ(3) NOT NULL *PROCTIME*
)
+----+-----------+-------------------------+--------------+--------------+-------------------------+-------------------------+-------------------------+-------------------------+
| op | localtime |          localtimestamp | current_date | current_time |       current_timestamp |                  EXPR$5 |                  EXPR$6 |                  EXPR$7 |
+----+-----------+-------------------------+--------------+--------------+-------------------------+-------------------------+-------------------------+-------------------------+
| +I |  12:59:06 | 2022-02-07 12:59:06.859 |   2022-02-07 |     12:59:06 | 2022-02-07 12:59:06.859 | 2022-02-07 12:59:06.859 | 2022-02-07 12:59:06.859 | 2022-02-07 12:59:06.862 |
+----+-----------+-------------------------+--------------+--------------+-------------------------+-------------------------+-------------------------+-------------------------+

Flink Table和SQL的表和视图、Connectors和timestamp数据类型相关推荐

  1. Flink教程(16)- Flink Table与SQL

    文章目录 01 引言 02 Table API & SQL 介绍 2.1 Flink Table模块 2.2 Table API & SQL特点 2.3 Table API& ...

  2. Flink Table Api SQL 初体验,Blink的使用

    概述 Flink具有Table API和SQL-用于统一流和批处理. Table API是用于Scala和Java的语言集成查询API,它允许以非常直观的方式组合来自关系运算符(例如选择,过滤和联接) ...

  3. Flink教程(17)- Flink Table与SQL(案例与SQL算子)

    文章目录 01 引言 02 Flink Table&SQL 案例 2.1 案例1(DataStream SQL统计) 2.2 案例2(DataStream Table&SQL统计) 2 ...

  4. 创建触发器报PL/SQL: ORA-00942: 表或视图不存在

    创建触发器报PL/SQL: ORA-00942: 表或视图不存在,现在如下验证进行重现: 1.创建用户u1赋予dba权限 SQL> create user u1 identified by u1 ...

  5. Flink Table和SQL的基本API

    文章目录 一个示例 程序架构 创建表环境 创建表 1.连接器 2.虚拟表 表的查询 1.执行SQL查询 2.调用Table API进行查询 3.两种API的结合使用 输出表 表和流的转换 1.将表转换 ...

  6. (十八)Flink Table API SQL 编程指南 Table API 和Datastream API 集成

    文章目录 DataStream 和 Table 之间的转换 依赖项和导入 配置 执行行为 datastream API table API 批处理运行时模式 Changelog统一 处理(仅插入)流 ...

  7. 2021年大数据Flink(三十):Flink ​​​​​​​Table API  SQL 介绍

    目录 ​​​​​​​Table API & SQL 介绍 为什么需要Table API & SQL ​​​​​​​Table API& SQL发展历程 架构升级 查询处理器的选 ...

  8. Flink Table API SQL编程指南(自定义Sources Sinks)

    TableSource TableSource提供对存储在外部系统(数据库,键值存储,消息队列)或文件中的数据的访问的通用接口.在TableEnvironment中注册TableSource后,可以通 ...

  9. 使用flink Table Sql api来构建批量和流式应用(3)Flink Sql 使用

    从flink的官方文档,我们知道flink的编程模型分为四层,sql层是最高层的api,Table api是中间层,DataStream/DataSet Api 是核心,stateful Stream ...

最新文章

  1. 怎样更好地团队协作沟通?
  2. 表单之label标签
  3. pytorch 归一化 测试(BatchNorm2d)
  4. 夺命雷公狗---微信开发26----客服消息接口基础和推送视频
  5. 基于容器原理(docker、lxc、cells)的Android 双系统设计概要
  6. 微信小程序|开发实战篇之八-list列表组件及其子组件
  7. JAVA ThreadPoolExecutor线程池
  8. 改善代码设计 —— 处理概括关系(Dealing with Generalization)
  9. Linux之crontab(计划任务)
  10. ROS教程之在自己键盘上控制小海龟移动
  11. 最流行的三大数据建模工具
  12. 苹果画画软件_想在iPad 上画画,推荐用这些软件
  13. 抽象代数 04.02 群在集合上的作用
  14. 记录学习Android基础的心得07:硬件控制P2
  15. 用Python实现一个简易的“听歌识曲”demo(一)
  16. 小猫踢足球-第14届蓝桥杯STEMA测评Scratch真题精选
  17. python爬虫招聘网站(智联)
  18. 区块链热点!STO被政府严令禁止
  19. mysql查询所有姓王的信息_MySQL(4)— 数据查询
  20. 向量内积和夹角的关系

热门文章

  1. 被挟制路由器DNS该怎样实时发觉和防备方法
  2. 蚂蚁金服准备上市,财务自由的声音还是躺赚的落幕?
  3. AI ProCon圆满落幕,五大技术专场精彩瞬间不容错过
  4. 心理学相关学习备忘录
  5. 即时聊天表情功能的实现
  6. 试题 历届试题 对局匹配
  7. 线段树进阶之清风拂面
  8. 【Python技能树共建】selenium入手篇
  9. v-if 和v-for能放在一起使用吗
  10. 晶体管问世;科幻巨匠诞生 | 历史上的今天