目录

  • 1. tEnv.fromDataStream(datastream[, schema])
  • 2. tEnv.toDataStream(table[, DataTypes/class])
  • 3. tEnv.fromChangelogStream(datastream[, Schema[, ChangelogMode]])
  • 4. tEnv.toChangelogStream(table[, Schema[, ChangelogMode]])
  • 5. StatementSet.attachAsDataStream()

1. tEnv.fromDataStream(datastream[, schema])

tEnv.createTemporaryView("new_table", datastream[, schema])等价于tEnv.createTemporaryView("new_table", tEnv.fromDataStream(datastream[, schema]))

隐士转换使用如下:

import org.apache.flink.table.api.bridge.scala.dataStreamConversions
datastream.toTable(tEnv[, Schema/Expression*($("col1"), $("col2"))])

示例程序

import org.apache.commons.lang3.time.FastDateFormat
import org.apache.flink.api.common.RuntimeExecutionMode
import org.apache.flink.api.common.eventtime._
import org.apache.flink.streaming.api.scala.{StreamExecutionEnvironment, createTypeInformation}
import org.apache.flink.table.api.bridge.scala.StreamTableEnvironment
import org.apache.flink.table.api.{DataTypes, Schema}case class UserClick(name: String, url: String, cTime: Long)class RecordTimestampAssigner extends TimestampAssigner[UserClick] {override def extractTimestamp(element: UserClick, recordTimestamp: Long): Long = {element.cTime}}class PeriodWatermarkGenerator extends WatermarkGenerator[UserClick] {var maxTimestamp: Long = _val maxOutofOrderness = 5// 1. 根据某个特殊的event emit watermarkoverride def onEvent(event: UserClick, eventTimestamp: Long, output: WatermarkOutput): Unit = {maxTimestamp = math.max(event.cTime, maxTimestamp)}// 2. 定期emit watermark,通过senv.getConfig.setAutoWatermarkInterval(200L)设置emit间隔时间,默认200msoverride def onPeriodicEmit(output: WatermarkOutput): Unit = {output.emitWatermark(new Watermark(maxTimestamp - maxOutofOrderness - 1))}
}class MyWatermarkStrategy extends WatermarkStrategy[UserClick] {override def createTimestampAssigner(context: TimestampAssignerSupplier.Context): TimestampAssigner[UserClick] = {new RecordTimestampAssigner()}override def createWatermarkGenerator(context: WatermarkGeneratorSupplier.Context): WatermarkGenerator[UserClick] = {new PeriodWatermarkGenerator()}}object flink_test {def main(args: Array[String]): Unit = {val fdf = FastDateFormat.getInstance("yyyy-MM-dd HH:mm:ss")val senv = StreamExecutionEnvironment.getExecutionEnvironmentsenv.setRuntimeMode(RuntimeExecutionMode.STREAMING)val tEnv = StreamTableEnvironment.create(senv)val datastream1 = senv.fromElements(UserClick("zhang_san", "./home", fdf.parse("2022-01-27 07:58:18").getTime),UserClick("li_si", "./cart", fdf.parse("2022-01-27 07:59:36").getTime)).assignTimestampsAndWatermarks(new MyWatermarkStrategy())println(datastream1.dataType)datastream1.print()val table1 = tEnv.fromDataStream(datastream1,Schema.newBuilder()// datastream的列要么不在schema中定义, 要么就全部定义, 定义时可以改变列的类型.column("name", DataTypes.STRING()).column("url", DataTypes.STRING()).column("cTime", DataTypes.BIGINT())// 新增列.columnByExpression("new_cTime", "to_timestamp(from_unixtime(cast(cTime as bigint) / 1000, 'yyyy-MM-dd HH:mm:ss'))").columnByMetadata("rowtime", DataTypes.TIMESTAMP_LTZ(3)).watermark("rowtime", "SOURCE_WATERMARK()").build())table1.printSchema()table1.execute().print()senv.execute()}
}

执行结果如下:

UserClick(name: String, url: String, cTime: Long)
(`name` STRING,`url` STRING,`cTime` BIGINT,`new_cTime` TIMESTAMP(3) AS to_timestamp(from_unixtime(cast(cTime as bigint) / 1000, 'yyyy-MM-dd HH:mm:ss')),`rowtime` TIMESTAMP_LTZ(3) *ROWTIME* METADATA,WATERMARK FOR `rowtime`: TIMESTAMP_LTZ(3) AS SOURCE_WATERMARK()
)
+----+--------------------------------+--------------------------------+----------------------+-------------------------+-------------------------+
| op |                           name |                            url |                cTime |               new_cTime |                 rowtime |
+----+--------------------------------+--------------------------------+----------------------+-------------------------+-------------------------+
| +I |                      zhang_san |                         ./home |        1643241498000 | 2022-01-27 07:58:18.000 | 2022-01-27 07:58:18.000 |
| +I |                          li_si |                         ./cart |        1643241576000 | 2022-01-27 07:59:36.000 | 2022-01-27 07:59:36.000 |
+----+--------------------------------+--------------------------------+----------------------+-------------------------+-------------------------+
2 rows in set
1> UserClick(zhang_san,./home,1643241498000)
2> UserClick(li_si,./cart,1643241576000)

说明如下:

  • 只支持insert-only形式的datastream, 其它形式的datastream也会当作insert-only来处理
  • table自动获取datastream的列名和字段类型
  • 如果datastream有timestamp和watermark, 则可以通过Schema.columnByMetadata(“rowtime”, DataTypes.TIMESTAMP_LTZ(3))将datastream的timestamp形成一个普通列,可以通过Schema.watermark(“rowtime”, “SOURCE_WATERMARK()”), 根据datastream的watermark形成table的watermark, 此时rowtime列自动变成timestamp列。watermark的生成sqlExpression也可以是"rowtime - interval ‘10’ second"

2. tEnv.toDataStream(table[, DataTypes/class])

隐士转换使用如下:

import org.apache.flink.table.api.bridge.scala.tableConversions
table.toDataStream([DataTypes/class])

示例程序:

import org.apache.flink.api.common.RuntimeExecutionMode
import org.apache.flink.streaming.api.scala.{DataStream, StreamExecutionEnvironment}
import org.apache.flink.table.api.bridge.scala.StreamTableEnvironment
import org.apache.flink.table.api.{DataTypes, Schema, TableDescriptor}/*val datetime1:java.time.Instant = java.time.Instant.ofEpochMilli(3000L)println(datetime1)      // 1970-01-01T00:00:03Zval timestamp1:Long = datetime1.toEpochMilliprintln(timestamp1)     // 3000
*/
case class User(name: String, score: Double, dataTime: java.time.Instant)object flink_test {def main(args: Array[String]): Unit = {val senv = StreamExecutionEnvironment.getExecutionEnvironmentsenv.setRuntimeMode(RuntimeExecutionMode.STREAMING)val tEnv = StreamTableEnvironment.create(senv)tEnv.createTemporaryTable("default_catalog.default_database.dataSource",TableDescriptor.forConnector("datagen").schema(Schema.newBuilder().column("name", DataTypes.STRING()).column("score", DataTypes.DOUBLE()).column("dataTime", DataTypes.TIMESTAMP_LTZ(3)).watermark("dataTime", "dataTime - interval '10' second").build()).option("rows-per-second", "1") // 每个slot每秒产生的数据量.build())// tEnv.from(TableDescriptor)val dataSourceTable = tEnv.from("dataSource")dataSourceTable.printSchema()// tEnv.executeSql("select * from dataSource").print()// tEnv.toDataStream(dataSourceTable, classOf[User])// tEnv.toDataStream(dataSourceTable, DataTypes.of(classOf[User]))val dataSourceDatastream: DataStream[User] = tEnv.toDataStream(dataSourceTable,// 默认是Row<String, Double, Timestamp_LTZ>类型。使用此方式可以改变列的类型DataTypes.STRUCTURED(classOf[User],DataTypes.FIELD("name", DataTypes.STRING()),DataTypes.FIELD("score", DataTypes.DOUBLE()),DataTypes.FIELD("dataTime", DataTypes.TIMESTAMP_LTZ(3))))print(dataSourceDatastream.dataType)dataSourceDatastream.executeAndCollect().foreach(println)}
}

输出结果如下:

(`name` STRING,`score` DOUBLE,`dataTime` TIMESTAMP_LTZ(3) *ROWTIME*,WATERMARK FOR `dataTime`: TIMESTAMP_LTZ(3) AS dataTime - interval '10' second
)
*User<`name` STRING, `score` DOUBLE, `dataTime` TIMESTAMP_LTZ(3)>*(User, org.apache.flink.table.runtime.typeutils.ExternalSerializer)User(d089222b0e15ce426897416fa0b23b2eb6a7d35dda0d8e23f61c673608b24faac09d3e2bc12b3d21aa6c018b0cf51ef0f980,1.6976293338728425E308,2022-01-27T04:30:47.009Z)
User(379a52cd0330f35856622d68b9c86be979de32a20ab3401bc377389a59494b325b50c9cb0186e7ac27170fa7c86c9f3ed0d1,5.483394273040764E307,2022-01-27T04:30:47.009Z)
User(c38f9276eec2c9f7d8bb8cba26665ff6bde5af68474e8ea65c6944f2c5fb457932fb0ab6f50172e2826880748b384c103241,4.605199108304596E307,2022-01-27T04:30:47.009Z)
......省略部分......

说明如下:

  • 只支持insert-only形式的table, 其它形式的直接报错
  • timestamp和watermark会自动传递给datastream

3. tEnv.fromChangelogStream(datastream[, Schema[, ChangelogMode]])

隐士转换使用如下:

import org.apache.flink.table.api.bridge.scala.dataStreamConversions
datastream.toChangelogTable(tEnv[, Schema[, ChangelogMode]])

示例程序

import org.apache.commons.lang3.time.FastDateFormat
import org.apache.flink.api.common.RuntimeExecutionMode
import org.apache.flink.api.common.eventtime._
import org.apache.flink.api.scala.typeutils.Types
import org.apache.flink.streaming.api.scala.{StreamExecutionEnvironment, createTypeInformation}
import org.apache.flink.table.api.bridge.scala.StreamTableEnvironment
import org.apache.flink.table.api.{DataTypes, Schema}
import org.apache.flink.table.connector.ChangelogMode
import org.apache.flink.types.{Row, RowKind}class RecordTimestampAssigner extends TimestampAssigner[Row] {override def extractTimestamp(element: Row, recordTimestamp: Long): Long = {element.getFieldAs[Long]("f2")}}class PeriodWatermarkGenerator extends WatermarkGenerator[Row] {var maxTimestamp: Long = _val maxOutofOrderness = 5// 1. 根据某个特殊的event emit watermarkoverride def onEvent(event: Row, eventTimestamp: Long, output: WatermarkOutput): Unit = {maxTimestamp = math.max(event.getFieldAs[Long]("f2"), maxTimestamp)}// 2. 定期emit watermark,通过senv.getConfig.setAutoWatermarkInterval(200L)设置emit间隔时间,默认200msoverride def onPeriodicEmit(output: WatermarkOutput): Unit = {output.emitWatermark(new Watermark(maxTimestamp - maxOutofOrderness - 1))}
}class MyWatermarkStrategy extends WatermarkStrategy[Row] {override def createTimestampAssigner(context: TimestampAssignerSupplier.Context): TimestampAssigner[Row] = {new RecordTimestampAssigner()}override def createWatermarkGenerator(context: WatermarkGeneratorSupplier.Context): WatermarkGenerator[Row] = {new PeriodWatermarkGenerator()}}object flink_test {def main(args: Array[String]): Unit = {val fdf = FastDateFormat.getInstance("yyyy-MM-dd HH:mm:ss")val senv = StreamExecutionEnvironment.getExecutionEnvironmentsenv.setRuntimeMode(RuntimeExecutionMode.STREAMING)val tEnv = StreamTableEnvironment.create(senv)// 有RowKind.UPDATE_BEFORE和RowKind.UPDATE_AFTER, 则为retract形式的changelog stream// 有RowKind.UPDATE_AFTER, 则为upsert形式的changelog streamval datastream1 = senv.fromElements(Row.ofKind(RowKind.INSERT, "zhang_san", "./home", Long.box(fdf.parse("2022-01-27 07:58:18").getTime)),Row.ofKind(RowKind.INSERT, "li_si", "./cart", Long.box(fdf.parse("2022-01-27 07:59:36").getTime)),Row.ofKind(RowKind.UPDATE_AFTER, "li_si", "./cart2", Long.box(fdf.parse("2022-01-27 07:59:36").getTime)),Row.ofKind(RowKind.DELETE, "zhang_san", "./home", Long.box(fdf.parse("2022-01-27 12:59:36").getTime)))(Types.ROW(Types.STRING, Types.STRING, Types.LONG)).assignTimestampsAndWatermarks(new MyWatermarkStrategy())println(datastream1.dataType)datastream1.print()val table1 = tEnv.fromChangelogStream(datastream1,Schema.newBuilder().primaryKey("f0", "f1").columnByMetadata("rowtime", DataTypes.TIMESTAMP_LTZ(3)).build(),ChangelogMode.upsert())table1.printSchema()table1.execute().print()senv.execute()}
}

结果如下:

Row(f0: String, f1: String, f2: Long)
(`f0` STRING NOT NULL,`f1` STRING NOT NULL,`f2` BIGINT,`rowtime` TIMESTAMP_LTZ(3) METADATA,CONSTRAINT `PK_f0_f1` PRIMARY KEY (`f0`, `f1`) NOT ENFORCED
)
+----+--------------------------------+--------------------------------+----------------------+-------------------------+
| op |                             f0 |                             f1 |                   f2 |                 rowtime |
+----+--------------------------------+--------------------------------+----------------------+-------------------------+
| +I |                          li_si |                         ./cart |        1643241576000 | 2022-01-27 07:59:36.000 |
| +I |                          li_si |                        ./cart2 |        1643241576000 | 2022-01-27 07:59:36.000 |
| +I |                      zhang_san |                         ./home |        1643241498000 | 2022-01-27 07:58:18.000 |
| -D |                      zhang_san |                         ./home |        1643241498000 | 2022-01-27 07:58:18.000 |
+----+--------------------------------+--------------------------------+----------------------+-------------------------+
4 rows in set
1> -D[zhang_san, ./home, 1643259576000]
6> +I[zhang_san, ./home, 1643241498000]
7> +I[li_si, ./cart, 1643241576000]
8> +U[li_si, ./cart2, 1643241576000]

说明如下:

  • datastream的类型必须为Row类型, 且该Row含有RowKind
  • timestamp和watermark默认不会被传递
  • ChangelogMode默认是retract(all), 用于指定datastream中的数据形式
  • ChangelogMode.upsert()必须指定primary key, 此时datastream中的RowKind.UPDATE_AFTER变成了table中的insert

4. tEnv.toChangelogStream(table[, Schema[, ChangelogMode]])

隐士转换使用如下:

import org.apache.flink.table.api.bridge.scala.tableConversions
table.toChangelogStream([Schema[, ChangelogMode]])import org.apache.flink.table.api.bridge.scala.tableToChangelogDataStream
val datastream:DataStream[Row] = table

示例程序

import org.apache.flink.api.common.RuntimeExecutionMode
import org.apache.flink.streaming.api.scala.{DataStream, StreamExecutionEnvironment}
import org.apache.flink.table.api.Expressions.$
import org.apache.flink.table.api.bridge.scala.StreamTableEnvironment
import org.apache.flink.table.api.{DataTypes, Schema, long2Literal, row, string2Literal}
import org.apache.flink.table.connector.ChangelogMode
import org.apache.flink.types.Row
import scala.collection.JavaConversions.asScalaIteratorobject flink_test {def main(args: Array[String]): Unit = {val senv = StreamExecutionEnvironment.getExecutionEnvironmentsenv.setRuntimeMode(RuntimeExecutionMode.STREAMING)val tEnv = StreamTableEnvironment.create(senv)val table1 = tEnv.fromValues(row("zhang_san", 10, "2022-01-27 15:00:06"),row("li_si", 100, "2022-01-27 16:00:06"),row("zhang_san", 20, "2022-01-27 15:00:16"),row("zhang_san", 30, "2022-01-27 15:00:36"),row("li_si", 200, "2022-01-27 16:00:56")).as("name", "amount", "dataTime")table1.printSchema()table1.execute().collect().foreach(println)val resultTable = table1.groupBy($("name")).select($("name"), $("amount").sum().as("amount"))resultTable.printSchema()resultTable.execute().print()val resultDatastream: DataStream[Row] = tEnv.toChangelogStream(resultTable,Schema.newBuilder().column("name", DataTypes.STRING().bridgedTo(classOf[String])).column("amount", DataTypes.BIGINT().bridgedTo(classOf[java.lang.Long])).build(),ChangelogMode.all())print(resultDatastream.dataType)resultDatastream.print()resultDatastream.executeAndCollect().foreach(row => {println(row.getKind)println(row.getFieldNames(true))})}
}

结果如下:

(`name` VARCHAR(9) NOT NULL,`amount` BIGINT NOT NULL,`dataTime` CHAR(19) NOT NULL
)
+I[zhang_san, 10, 2022-01-27 15:00:06]
+I[li_si, 100, 2022-01-27 16:00:06]
+I[zhang_san, 20, 2022-01-27 15:00:16]
+I[zhang_san, 30, 2022-01-27 15:00:36]
+I[li_si, 200, 2022-01-27 16:00:56]
(`name` VARCHAR(9) NOT NULL,`amount` BIGINT NOT NULL
)
+----+--------------------------------+----------------------+
| op |                           name |               amount |
+----+--------------------------------+----------------------+
| +I |                      zhang_san |                   10 |
| -U |                      zhang_san |                   10 |
| +U |                      zhang_san |                   30 |
| -U |                      zhang_san |                   30 |
| +U |                      zhang_san |                   60 |
| +I |                          li_si |                  100 |
| -U |                          li_si |                  100 |
| +U |                          li_si |                  300 |
+----+--------------------------------+----------------------+
8 rows in set
ROW<`name` STRING, `amount` BIGINT> NOT NULL(org.apache.flink.types.Row, org.apache.flink.table.runtime.typeutils.ExternalSerializer)2> +I[zhang_san, 10]
4> +I[li_si, 100]
4> -U[li_si, 100]
2> -U[zhang_san, 10]
4> +U[li_si, 300]
2> +U[zhang_san, 30]
2> -U[zhang_san, 30]
2> +U[zhang_san, 60]
INSERT
[name, amount]
UPDATE_BEFORE
[name, amount]
UPDATE_AFTER
[name, amount]
INSERT
[name, amount]
UPDATE_BEFORE
[name, amount]
UPDATE_AFTER
[name, amount]
UPDATE_BEFORE
[name, amount]
UPDATE_AFTER
[name, amount]

说明:

  • 会将table的timestamp和watermark传递给datastream
  • ChangelogMode用于指定table中数据的形式, 在本案例中table的流数据为retract形式, 如果指定ChangelogMode.insertOnly()则会报错,
    如果指定ChangelogMode.upsert()则datastream中会少-U类型的数据

5. StatementSet.attachAsDataStream()

将Table和SQL的多个insert操作,转换成DataStream,同时清除statementSet,最后通过senv.execute()进行触发

val statementSet = tEnv.createStatementSet()statementSet.addInsert(......)
statementSet .addInsertSql(......)statementSet.attachAsDataStream()senv.execute()

Flink Table和SQL中Table和DataStream的相互转换(fromDataStream、toChangelogStream、attachAsDataStream)相关推荐

  1. SQL中Table型数据(表变量)与用户自定义函数(downmoon)

    SQL中Table型数据与用户自定义函数(downmoon) SQL Server 2000 新增了Table型数据:Table型数据不能用来定义列的类型,只能用作T-SQL变量或者作为自定义函数的返 ...

  2. flink+mysql+connector_Flink SQL中connector的定义和实现

    在FLink SQL中一般是以create Table和connector结合的形式读取外部数据,从而创建table,如下是以JDBC作为connector的创建格式: CREATE TABLE My ...

  3. oracle truncate table语法,SQL Truncate Table

    在本教程中,我们来学习如何使用SQL TRUNCATE TABLE语句高效,快速地删除表中的所有数据. 1. SQL TRUNCATE TABLE语句简介 要删除表中的所有数据,可使用不带WHERE子 ...

  4. layui table 添加img_layui中table表格的基本操作

    最近抽空总结了下layui中table表格的相关配置,解释的比较全面,可供大家参考,页面效果如下: table表格.png table表格html部分: 新增 备注 编辑 table表格渲染js (包 ...

  5. sql中日期和时间戳的相互转换

    1.日期(datetime类型)转时间戳 EXTRACT ( epoch FROM NOW()) EXTRACT ( epoch FROM CAST ( time AS TIMESTAMP ) ) 注 ...

  6. duration转为时间戳_Flink Table APIamp;SQL编程指南之时间属性(3)

    Flink总共有三种时间语义:Processing time(处理时间).Event time(事件时间)以及Ingestion time(摄入时间).关于这些时间语义的具体解释,可以参考另一篇文章F ...

  7. mysql create很多table,SQL CREATE TABLE 语句

    原标题:SQL CREATE TABLE 语句 SQL CREATE TABLE 语句 CREATE TABLE 语句用于创建数据库中的表. 表由行和列组成,每个表都必须有个表名. SQL CREAT ...

  8. Flink 1.11 SQL 十余项革新大揭秘,哪些演变在便捷你的使用体验?

    简介: SQL 作为 Flink 中公认的核心模块之一,对推动 Flink 流批一体功能的完善至关重要.在 1.11 中,Flink SQL 也进行了大量的增强与完善,开发大功能 10 余项,不仅扩大 ...

  9. COLLATE oracle,Sql 中Collate用法

    今天查询sqlite的时候需要不区分大小写,查了下文档,需要使用collate nocase.顺便学习下collate的用法. collate在sql中是用来定义排序规则的.排序规则其实就是当比较两个 ...

  10. oracle加分号报错,在Oracle SQL中,何时需要使用分号和斜杠?

    我知道这是一个老生常谈,但我只是偶然发现,我觉得这一点还没有得到完全解释. 在SQL*Plus中,/和一个;因为他们的工作方式不同. 这个;结束SQL语句,而/执行当前"缓冲区"中 ...

最新文章

  1. 日期控件的点击事件,在js中添加callback属性,不在html中直接添加
  2. 4固定在底部_礼堂椅厂家教你如何固定座椅
  3. 详解GaussDB(DWS) explain分布式执行计划
  4. (转)petshop4.0中的Profile理解(匿名用户身份)
  5. JZOJ 3427. 归途与征程
  6. python编程例子-python编程例子
  7. python4.2_python4.2参数传入
  8. Layout anchors徒手写AutoLayout Swift
  9. JDK源码系列 下载源码
  10. 使用谷歌云盘和colab训练自己的数据集(yolov5格式)
  11. VS2005中远程调试的配置方法
  12. android_adb pm和adb am +启动/杀死app进程
  13. C++方法名称还原命令c++filt
  14. 阿里云ECS安骑士离线修复步骤
  15. 用户体验五要素--战略层、范围层、结构层、框架层、表现层
  16. 读书笔记数据科学入门————可视化数据
  17. Android应用内展示office文件--腾讯浏览服务(TBS)
  18. 什么是序列化与反序列化
  19. 破解中国汽车未来,BAT及车企各巨头汇聚世界智能网联汽车大会
  20. 基于移动终端的大学生心理健康交互管理系统的研究与设计

热门文章

  1. ASP.NET 即时通信,WebSocket服务端实例
  2. 优锘科技:数字孪生如何与新基建摩擦出智慧火花
  3. 一文带你了解推荐系统常用模型及框架
  4. 积分上限函数的导数例题 笔记
  5. mac制作linux启动盘,Mac 下制作开机启动盘,做了一个Centos7 的系统U盘
  6. 【数学模型】基于Matlab模拟超市排队系统
  7. 1998-2018 TOM邮箱20年发展
  8. 影驰悍将120SSD盘只认20M,有时能认,有时认不到的修复
  9. 详解基于深度学习的伪装目标检测
  10. 判断一个正整数是否为2的整数次幂的宏定义