文章目录

  • Flink Table 和 DataStream 转换
  • 1. 表(Table) 转换为 流(DataStream)
    • 1.1 处理(仅插入)流
      • 1.1.1 fromDataStream()方法:
        • 1.1.1.1 fromDataStream(DataStream var1)
        • 1.1.1.2 fromDataStream(DataStream var1, Expression... var2)
        • 1.1.1.3 fromDataStream(DataStream var1, Schema var2)
      • 1.1.2 createTemporaryView()方法:
    • 1.2 处理变更日志流
      • 1.2.1 fromChangelogStream ()方法
  • 2. 流(DataStream) 转换为 表(Table)
    • 2.1 处理(仅插入)流
      • 2.1.1 toDataStream()方法:
    • 2.2 处理变更日志流
      • 2.2.1 toChangelogStream()方法:
        • 2.2.1.1 toChangelogStream(Table var1)
        • 2.2.1.2 toChangelogStream(Table var1, Schema var2)

Flink Table 和 DataStream 转换

Flink官方文档:官方文档

1. 表(Table) 转换为 流(DataStream)

1.1 处理(仅插入)流

1.1.1 fromDataStream()方法:

/**
* 将仅插入更改和任意类型的流解释为表。默认情况下不传播事件时间和水印
*/<T> Table fromDataStream(DataStream<T> var1);/**
* 将仅插入更改和任意类型的流解释为表。默认情况下不传播事件时间和水印
*/
<T> Table fromDataStream(DataStream<T> var1, Expression... var2);/**
* 将仅插入更改和任意类型的流解释为表。可选模式允许丰富列数据类型并添加时间属性、水印策略、其他计算列或主键。
*/
<T> Table fromDataStream(DataStream<T> var1, Schema var2);

官方示例代码:

import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.table.api.Schema;
import org.apache.flink.table.api.Table;
import java.time.Instant;// some example POJO
public static class User {public String name;public Integer score;public Instant event_time;// default constructor for DataStream APIpublic User() {}// fully assigning constructor for Table APIpublic User(String name, Integer score, Instant event_time) {this.name = name;this.score = score;this.event_time = event_time;}
}// create a DataStream
DataStream<User> dataStream =env.fromElements(new User("Alice", 4, Instant.ofEpochMilli(1000)),new User("Bob", 6, Instant.ofEpochMilli(1001)),new User("Alice", 10, Instant.ofEpochMilli(1002)));// === EXAMPLE 1 ===// derive all physical columns automatically
// 示例 1 说明了一个不需要基于时间的操作的简单用例
Table table = tableEnv.fromDataStream(dataStream);
table.printSchema();
// prints:
// (
//  `name` STRING,
//  `score` INT,
//  `event_time` TIMESTAMP_LTZ(9)
// )// === EXAMPLE 2 ===// derive all physical columns automatically
// but add computed columns (in this case for creating a proctime attribute column)
// 示例 2 是这些基于时间的操作应该在处理时间内工作的最常见用例。
Table table = tableEnv.fromDataStream(dataStream,Schema.newBuilder().columnByExpression("proc_time", "PROCTIME()").build());
table.printSchema();
// prints:
// (
//  `name` STRING,
//  `score` INT NOT NULL,
//  `event_time` TIMESTAMP_LTZ(9),
//  `proc_time` TIMESTAMP_LTZ(3) NOT NULL *PROCTIME* AS PROCTIME()
//)// === EXAMPLE 3 ===// derive all physical columns automatically
// but add computed columns (in this case for creating a rowtime attribute column)
// and a custom watermark strategyTable table =tableEnv.fromDataStream(dataStream,Schema.newBuilder().columnByExpression("rowtime", "CAST(event_time AS TIMESTAMP_LTZ(3))").watermark("rowtime", "rowtime - INTERVAL '10' SECOND").build());
table.printSchema();
// prints:
// (
//  `name` STRING,
//  `score` INT,
//  `event_time` TIMESTAMP_LTZ(9),
//  `rowtime` TIMESTAMP_LTZ(3) *ROWTIME* AS CAST(event_time AS TIMESTAMP_LTZ(3)),
//  WATERMARK FOR `rowtime`: TIMESTAMP_LTZ(3) AS rowtime - INTERVAL '10' SECOND
// )// === EXAMPLE 4 ===// derive all physical columns automatically
// but access the stream record's timestamp for creating a rowtime attribute column
// also rely on the watermarks generated in the DataStream API// we assume that a watermark strategy has been defined for `dataStream` before
// (not part of this example)
// 示例 4 是最常见的用例,当基于时间的操作(例如窗口或间隔连接)应成为管道的一部分时.
Table table =tableEnv.fromDataStream(dataStream,Schema.newBuilder().columnByMetadata("rowtime", "TIMESTAMP_LTZ(3)").watermark("rowtime", "SOURCE_WATERMARK()").build());
table.printSchema();
// prints:
// (
//  `name` STRING,
//  `score` INT,
//  `event_time` TIMESTAMP_LTZ(9),
//  `rowtime` TIMESTAMP_LTZ(3) *ROWTIME* METADATA,
//  WATERMARK FOR `rowtime`: TIMESTAMP_LTZ(3) AS SOURCE_WATERMARK()
// )// === EXAMPLE 5 ===// define physical columns manually
// in this example,
//   - we can reduce the default precision of timestamps from 9 to 3
//   - we also project the columns and put `event_time` to the beginning
// 示例 5 完全依赖于用户的声明
Table table =tableEnv.fromDataStream(dataStream,Schema.newBuilder().column("event_time", "TIMESTAMP_LTZ(3)").column("name", "STRING").column("score", "INT").watermark("event_time", "SOURCE_WATERMARK()").build());
table.printSchema();
// prints:
// (
//  `event_time` TIMESTAMP_LTZ(3) *ROWTIME*,
//  `name` VARCHAR(200),
//  `score` INT
// )
// note: the watermark strategy is not shown due to the inserted column reordering projection// DataTypes使用,由于DataTypes要比TypeInformation更灵活
Table table = tableEnv.fromDataStream(dataStream,Schema.newBuilder().column("f0", DataTypes.of(User.class)).build()).as("user");
table.printSchema();
// prints:
// (
//  `user` *User<`name` STRING,`score` INT>*
// )// data types can be extracted reflectively as above or explicitly definedTable table3 = tableEnv.fromDataStream(dataStream,Schema.newBuilder().column("f0",DataTypes.STRUCTURED(User.class,DataTypes.FIELD("name", DataTypes.STRING()),DataTypes.FIELD("score", DataTypes.INT()))).build()).as("user");
table.printSchema();
// prints:
// (
//  `user` *User<`name` STRING,`score` INT>*
// )

以上三种方式的示例代码:

1.1.1.1 fromDataStream(DataStream var1)
package com.ali.flink.demo.driver;import com.ali.flink.demo.bean.Event;
import com.ali.flink.demo.utils.DataGeneratorImpl003;
import com.ali.flink.demo.utils.FlinkEnv;
import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONArray;
import com.alibaba.fastjson.JSONObject;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.source.datagen.DataGeneratorSource;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;public class FlinkTableTransToStreamDemo001 {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env = FlinkEnv.FlinkDataStreamRunEnv();env.setParallelism(1);StreamTableEnvironment tableEnv = FlinkEnv.getStreamTableEnv(env);DataGeneratorSource<String> dataGeneratorSource = new DataGeneratorSource<>(new DataGeneratorImpl003());DataStream<String> sourceStream = env.addSource(dataGeneratorSource).returns(String.class);//        sourceStream.print("source");// map函数将数据转换为POJO类DataStream<Event> mapStream = sourceStream.map(new MapFunction<String, Event>() {@Overridepublic Event map(String s) throws Exception {JSONObject jsonObject = JSON.parseObject(s);String name = jsonObject.getString("name");JSONObject title = jsonObject.getJSONObject("title");String title_name = title.getString("title_name");int title_number = title.getIntValue("title_number");JSONArray user_info = jsonObject.getJSONArray("user_info");String address = user_info.getJSONObject(0).getString("address");JSONObject time_info = jsonObject.getJSONObject("time_info");long timestamp = time_info.getLongValue("timestamp");return new Event(name, title.toJSONString(), title_name, title_number, user_info.toJSONString(), address, time_info.toJSONString(), timestamp);}}).returns(Event.class);mapStream.print("map source");// 将POJO类的数据流转换为Table,由于是POJO类的数据流,所以转换后的Table的column信息跟POJO类属性信息一致Table table = tableEnv.fromDataStream(mapStream);// 创建临时表,表名为sourcetableEnv.createTemporaryView("source", table);// 执行sql,这里可以直接使用POJO类属性名来查询Table result = tableEnv.sqlQuery("select name, title_name, address from source");// 将结果表转换为DataStream来输出tableEnv.toDataStream(result).print("result");env.execute("job start");}
}-------------------------结果-----------------------------
map source> Event{name='Tom1', title={"title_number":3,"title_name":"表情包"}, title_name='表情包', title_number=3, user_info=[{"address":"北京市","city":"beijing"},{"address":"上海市","city":"shanghai"}], address='北京市', time_info={"timestamp":1657332118000}, timestamp='1657332118000'}
result> +I[Tom1, 表情包, 北京市]
1.1.1.2 fromDataStream(DataStream var1, Expression… var2)
package com.ali.flink.demo.driver;import com.ali.flink.demo.bean.Event;
import com.ali.flink.demo.utils.DataGeneratorImpl003;
import com.ali.flink.demo.utils.FlinkEnv;
import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONArray;
import com.alibaba.fastjson.JSONObject;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.source.datagen.DataGeneratorSource;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;import static org.apache.flink.table.api.Expressions.$;public class FlinkTableTransToStreamDemo002 {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env = FlinkEnv.FlinkDataStreamRunEnv();env.setParallelism(1);StreamTableEnvironment tableEnv = FlinkEnv.getStreamTableEnv(env);DataGeneratorSource<String> dataGeneratorSource = new DataGeneratorSource<>(new DataGeneratorImpl003());DataStream<String> sourceStream = env.addSource(dataGeneratorSource).returns(String.class);//        sourceStream.print("source");DataStream<Event> mapStream = sourceStream.map(new MapFunction<String, Event>() {@Overridepublic Event map(String s) throws Exception {JSONObject jsonObject = JSON.parseObject(s);String name = jsonObject.getString("name");JSONObject title = jsonObject.getJSONObject("title");String title_name = title.getString("title_name");int title_number = title.getIntValue("title_number");JSONArray user_info = jsonObject.getJSONArray("user_info");String address = user_info.getJSONObject(0).getString("address");JSONObject time_info = jsonObject.getJSONObject("time_info");long timestamp = time_info.getLongValue("timestamp");return new Event(name, title.toJSONString(), title_name, title_number, user_info.toJSONString(), address, time_info.toJSONString(), timestamp);}}).returns(Event.class);mapStream.print("map source");// 指定column字段,并可以通过as方法来重命名Table table = tableEnv.fromDataStream(mapStream, $("name").as("username"), $("address"));// 创建临时表,表名为sourcetableEnv.createTemporaryView("source", table);// 执行sql,查询表Table result = tableEnv.sqlQuery("select username, address from source");// 将结果表转换为DataStream来输出tableEnv.toDataStream(result).print("result");env.execute("job start");}
}-------------------------结果-----------------------------
map source> Event{name='Tom3', title={"title_number":3,"title_name":"表情包"}, title_name='表情包', title_number=3, user_info=[{"address":"北京市","city":"beijing"},{"address":"上海市","city":"shanghai"}], address='北京市', time_info={"timestamp":1657332118000}, timestamp='1657332118000'}
result> +I[Tom3, 北京市]
1.1.1.3 fromDataStream(DataStream var1, Schema var2)
package com.ali.flink.demo.driver;import com.ali.flink.demo.bean.Event;
import com.ali.flink.demo.utils.DataGeneratorImpl003;
import com.ali.flink.demo.utils.FlinkEnv;
import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONArray;
import com.alibaba.fastjson.JSONObject;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.source.datagen.DataGeneratorSource;
import org.apache.flink.table.api.Schema;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;public class FlinkTableTransToStreamDemo003 {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env = FlinkEnv.FlinkDataStreamRunEnv();env.setParallelism(1);StreamTableEnvironment tableEnv = FlinkEnv.getStreamTableEnv(env);DataGeneratorSource<String> dataGeneratorSource = new DataGeneratorSource<>(new DataGeneratorImpl003());DataStream<String> sourceStream = env.addSource(dataGeneratorSource).returns(String.class);//        sourceStream.print("source");DataStream<Event> mapStream = sourceStream.map(new MapFunction<String, Event>() {@Overridepublic Event map(String s) throws Exception {JSONObject jsonObject = JSON.parseObject(s);String name = jsonObject.getString("name");JSONObject title = jsonObject.getJSONObject("title");String title_name = title.getString("title_name");int title_number = title.getIntValue("title_number");JSONArray user_info = jsonObject.getJSONArray("user_info");String address = user_info.getJSONObject(0).getString("address");JSONObject time_info = jsonObject.getJSONObject("time_info");long timestamp = time_info.getLongValue("timestamp");return new Event(name, title.toJSONString(), title_name, title_number, user_info.toJSONString(), address, time_info.toJSONString(), timestamp);}}).returns(Event.class);mapStream.print("map source");// 定义SchemaTable table = tableEnv.fromDataStream(mapStream,Schema.newBuilder().column("name", "string").column("timestamp", "bigint").build());// 创建临时表,表名为sourcetableEnv.createTemporaryView("source", table);// 执行sql,查询表Table result = tableEnv.sqlQuery("select name, `timestamp` from source");// 将结果表转换为DataStream来输出tableEnv.toDataStream(result).print("result");env.execute("job start");}
}-------------------------结果-----------------------------
map source> Event{name='Tom3', title={"title_number":3,"title_name":"表情包"}, title_name='表情包', title_number=3, user_info=[{"address":"北京市","city":"beijing"},{"address":"上海市","city":"shanghai"}], address='北京市', time_info={"timestamp":1657332118000}, timestamp='1657332118000'}
result> +I[Tom3, 1657332118000]

1.1.2 createTemporaryView()方法:

import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStream;// create some DataStream
DataStream<Tuple2<Long, String>> dataStream = env.fromElements(Tuple2.of(12L, "Alice"),Tuple2.of(0L, "Bob"));// === EXAMPLE 1 ===// register the DataStream as view "MyView" in the current session
// all columns are derived automaticallytableEnv.createTemporaryView("MyView", dataStream);tableEnv.from("MyView").printSchema();// prints:
// (
//  `f0` BIGINT NOT NULL,
//  `f1` STRING
// )// === EXAMPLE 2 ===// register the DataStream as view "MyView" in the current session,
// provide a schema to adjust the columns similar to `fromDataStream`// in this example, the derived NOT NULL information has been removedtableEnv.createTemporaryView("MyView",dataStream,Schema.newBuilder().column("f0", "BIGINT").column("f1", "STRING").build());tableEnv.from("MyView").printSchema();// prints:
// (
//  `f0` BIGINT,
//  `f1` STRING
// )// === EXAMPLE 3 ===// use the Table API before creating the view if it is only about renaming columnstableEnv.createTemporaryView("MyView",tableEnv.fromDataStream(dataStream).as("id", "name"));tableEnv.from("MyView").printSchema();// prints:
// (
//  `id` BIGINT NOT NULL,
//  `name` STRING
// )

1.2 处理变更日志流

1.2.1 fromChangelogStream ()方法

类型必须是org.apache.flink.types.Row,使用 Row.ofKind(RowKind kind, Object… values) 设置每条数据,其中 RowKind的类型有一下4种:INSERT(插入)、UPDATE_BEFORE(更新前)、UPDATE_AFTER(更新后)、DELETE(删除)

// 将变更日志条目流解释为表格。流记录类型必须是org.apache.flink.types.Row,因为它的RowKind标志是在运行时评估的。默认情况下不传播事件时间和水印。此方法需要一个包含各种更改(在 中枚举org.apache.flink.types.RowKind)作为默认值的更改日志ChangelogMode。
@Experimental
Table fromChangelogStream(DataStream<Row> var1);// 允许为DataStream类似于fromDataStream(DataStream, Schema). 否则语义等于fromChangelogStream(DataStream)。
@Experimental
Table fromChangelogStream(DataStream<Row> var1, Schema var2);// 完全控制如何将流解释为变更日志。传递ChangelogMode帮助计划者区分insert-only、 upsert或retract行为。
@Experimental
Table fromChangelogStream(DataStream<Row> var1, Schema var2, ChangelogMode var3);

官方示例代码:

import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.table.api.Schema;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.connector.ChangelogMode;
import org.apache.flink.types.Row;
import org.apache.flink.types.RowKind;// === EXAMPLE 1 ===// interpret the stream as a retract stream// create a changelog DataStream
DataStream<Row> dataStream =env.fromElements(Row.ofKind(RowKind.INSERT, "Alice", 12),Row.ofKind(RowKind.INSERT, "Bob", 5),Row.ofKind(RowKind.UPDATE_BEFORE, "Alice", 12),Row.ofKind(RowKind.UPDATE_AFTER, "Alice", 100));// interpret the DataStream as a Table
Table table = tableEnv.fromChangelogStream(dataStream);// register the table under a name and perform an aggregation
tableEnv.createTemporaryView("InputTable", table);
tableEnv.executeSql("SELECT f0 AS name, SUM(f1) AS score FROM InputTable GROUP BY f0").print();// prints:
// +----+--------------------------------+-------------+
// | op |                           name |       score |
// +----+--------------------------------+-------------+
// | +I |                            Bob |           5 |
// | +I |                          Alice |          12 |
// | -D |                          Alice |          12 |
// | +I |                          Alice |         100 |
// +----+--------------------------------+-------------+// === EXAMPLE 2 ===// interpret the stream as an upsert stream (without a need for UPDATE_BEFORE)// create a changelog DataStream
DataStream<Row> dataStream =env.fromElements(Row.ofKind(RowKind.INSERT, "Alice", 12),Row.ofKind(RowKind.INSERT, "Bob", 5),Row.ofKind(RowKind.UPDATE_AFTER, "Alice", 100));// interpret the DataStream as a Table
Table table =tableEnv.fromChangelogStream(dataStream,Schema.newBuilder().primaryKey("f0").build(),ChangelogMode.upsert());// register the table under a name and perform an aggregation
tableEnv.createTemporaryView("InputTable", table);
tableEnv.executeSql("SELECT f0 AS name, SUM(f1) AS score FROM InputTable GROUP BY f0").print();// prints:
// +----+--------------------------------+-------------+
// | op |                           name |       score |
// +----+--------------------------------+-------------+
// | +I |                            Bob |           5 |
// | +I |                          Alice |          12 |
// | -U |                          Alice |          12 |
// | +U |                          Alice |         100 |
// +----+--------------------------------+-------------+

2. 流(DataStream) 转换为 表(Table)

2.1 处理(仅插入)流

2.1.1 toDataStream()方法:

import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.table.api.DataTypes;
import org.apache.flink.table.api.Table;
import org.apache.flink.types.Row;
import java.time.Instant;// POJO with mutable fields
// since no fully assigning constructor is defined, the field order
// is alphabetical [event_time, name, score]
public static class User {public String name;public Integer score;public Instant event_time;
}tableEnv.executeSql("CREATE TABLE GeneratedTable "+ "("+ "  name STRING,"+ "  score INT,"+ "  event_time TIMESTAMP_LTZ(3),"+ "  WATERMARK FOR event_time AS event_time - INTERVAL '10' SECOND"+ ")"+ "WITH ('connector'='datagen')");Table table = tableEnv.from("GeneratedTable");// === EXAMPLE 1 ===// use the default conversion to instances of Row// since `event_time` is a single rowtime attribute, it is inserted into the DataStream
// metadata and watermarks are propagatedDataStream<Row> dataStream = tableEnv.toDataStream(table);// === EXAMPLE 2 ===// a data type is extracted from class `User`,
// the planner reorders fields and inserts implicit casts where possible to convert internal
// data structures to the desired structured type// since `event_time` is a single rowtime attribute, it is inserted into the DataStream
// metadata and watermarks are propagatedDataStream<User> dataStream = tableEnv.toDataStream(table, User.class);// data types can be extracted reflectively as above or explicitly definedDataStream<User> dataStream =tableEnv.toDataStream(table,DataTypes.STRUCTURED(User.class,DataTypes.FIELD("name", DataTypes.STRING()),DataTypes.FIELD("score", DataTypes.INT()),DataTypes.FIELD("event_time", DataTypes.TIMESTAMP_LTZ(3))));

2.2 处理变更日志流

2.2.1 toChangelogStream()方法:

2.2.1.1 toChangelogStream(Table var1)
package com.ali.flink.demo.driver;import com.ali.flink.demo.utils.FlinkEnv;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;import static org.apache.flink.table.api.Expressions.$;
import static org.apache.flink.table.api.Expressions.row;public class FlinkStreamTransToTableDemo001 {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env = FlinkEnv.FlinkDataStreamRunEnv();env.setParallelism(1);StreamTableEnvironment tableEnv = FlinkEnv.getStreamTableEnv(env);Table simpleTable = tableEnv.fromValues(row("Alice", 12), row("Alice", 2), row("Bob", 12)).as("name", "score").groupBy($("name")).select($("name"), $("score").sum().as("score"));simpleTable.printSchema();simpleTable.execute().print();tableEnv.toChangelogStream(simpleTable).executeAndCollect().forEachRemaining(System.out::println);env.execute("job start");}
}------------------------------ 结果 --------------------------------
+I[Alice, 12]
-U[Alice, 12]
+U[Alice, 14]
+I[Bob, 12]
2.2.1.2 toChangelogStream(Table var1, Schema var2)
package com.ali.flink.demo.driver;import com.ali.flink.demo.utils.FlinkEnv;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.Schema;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
import org.apache.flink.types.Row;public class FlinkStreamTransToTableDemo002 {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env = FlinkEnv.FlinkDataStreamRunEnv();env.setParallelism(1);StreamTableEnvironment tableEnv = FlinkEnv.getStreamTableEnv(env);// create Table with event-timetableEnv.executeSql("CREATE TABLE GeneratedTable "+ "("+ "  name STRING,"+ "  score INT,"+ "  event_time TIMESTAMP_LTZ(3),"+ "  WATERMARK FOR event_time AS event_time - INTERVAL '10' SECOND"+ ")"+ "WITH ('connector'='datagen')");Table table = tableEnv.from("GeneratedTable");DataStream<Row> dataStream = tableEnv.toChangelogStream(table, Schema.newBuilder().column("name", "string").column("score", "int").columnByMetadata("rowtime", "TIMESTAMP_LTZ(3)").build());dataStream.print("dataStream");env.execute("job start");}
}------------------------------ 结果 --------------------------------
dataStream> +I[02295d4b23932df652d9e1eb07da611d68613f7e75794680c3f4a29627f94dacf2d82bf5fc3183f5af2d5fad0ab6c1d45272, -316104097]
dataStream> +I[ef45440d96c3ba64bbf4a143f773b26356fcb955abdb352913c30131cc900c52a2f20efc4b5ef4eda86d5e1518c38654e822, 2048383718]

Flink Table 和 DataStream 转换相关推荐

  1. [Flink]Flink常用的DataStream转换算子

    目录 3.1 Map 3.2 FlatMap 3.3 Filter 3.4 KeyBy 3.5 Reduce 3.6 Fold 3.7 Aggregations 3.8 Window 3.9 Wind ...

  2. (十八)Flink Table API SQL 编程指南 Table API 和Datastream API 集成

    文章目录 DataStream 和 Table 之间的转换 依赖项和导入 配置 执行行为 datastream API table API 批处理运行时模式 Changelog统一 处理(仅插入)流 ...

  3. 2021年大数据Flink(三十):Flink ​​​​​​​Table API  SQL 介绍

    目录 ​​​​​​​Table API & SQL 介绍 为什么需要Table API & SQL ​​​​​​​Table API& SQL发展历程 架构升级 查询处理器的选 ...

  4. 使用flink Table Sql api来构建批量和流式应用(1)Table的基本概念

    从flink的官方文档,我们知道flink的编程模型分为四层,sql层是最高层的api,Table api是中间层,DataStream/DataSet Api 是核心,stateful Stream ...

  5. 【Flink】Flink Table SQL 用户自定义函数: UDF、UDAF、UDTF

    本文总结Flink Table & SQL中的用户自定义函数: UDF.UDAF.UDTF. UDF: 自定义标量函数(User Defined Scalar Function).一行输入一行 ...

  6. Flink Table和SQL的基本API

    文章目录 一个示例 程序架构 创建表环境 创建表 1.连接器 2.虚拟表 表的查询 1.执行SQL查询 2.调用Table API进行查询 3.两种API的结合使用 输出表 表和流的转换 1.将表转换 ...

  7. flink Table API 与SQL入门实战

    流处理和批处理都可以用,是非常的方便! 导入依赖 <dependency><groupId>org.apache.flink</groupId><artifa ...

  8. Flink Table API和SQL(下)

    传送门: Flink Table API和SQL(上)(基本API介绍+流处理表的特性) Flink Table API和SQL(中)(时间属性及窗口+聚合查询+联结查询) Flink Table A ...

  9. flinkSQL Table转DataStream

    flink版本1.14 flinksql 来源于kafka json格式数据 变化的表 业务中sql可能不完全满足使用,需要转换成DataStream 更灵活一些,所以需要互相转换,发挥各自的优势. ...

最新文章

  1. 如何在ToolBar中显示文字和图标,自定义图标大小,并和MenuItem关联
  2. 【python教程入门学习】机器学习使用Python编程是因为什么?
  3. SAP HUM对嵌套HU做WM货物移动时TO单上只显示外层HU
  4. 训练图像预处理函数功能(paddle)
  5. Python程序设计题解【蓝桥杯官网题库】 DAY3-基础练习
  6. 正则表达式学习笔记(一)
  7. 31 socket客户端. 服务器 异常 语法
  8. linux脚本vrrp_script,keepalived 的 vrrp_script
  9. 嵌入式行业35岁以后_35岁以后的中年人该如何求生?
  10. MongoDB Array Query Operators
  11. 严蔚敏数据结构课后参考答案
  12. spring源码系列(一)sring源码编译 spring源码下载 spring源码阅读
  13. 使用telnet登录数据库服务器
  14. LuoguP1637 三元上升子序列
  15. 消费者运营-阿里系三大模型
  16. 厦门大学c语言第七八章作业答案,数据结构第七章考试题库(含答案).doc
  17. 用devc++表白_【建大表白墙】19级倪yl,风吹起如花般破碎的流年,而你的笑容摇晃摇晃,成为我命途中最美的点缀...
  18. 解决Chrome 无法保存密码问题
  19. 解决为什么svn没有对号等符号的问题。
  20. 生产质量优化方案,助力企业搞好“质量”与“成本”关系!

热门文章

  1. 页面中拖拽效果的实现
  2. Xilinx FPGA DDR3设计(一)DDR3基础扫盲
  3. 垃圾回收的主要区域是堆,那方法区会回收吗?
  4. 强推:raw图片处理软件DxO PhotoLab
  5. centos8更换软件源
  6. git报错-The file will have its original
  7. 酷睿i5 1240p什么水平 i5 1240p参数 i51240p是标压还是低压
  8. 以太坊学习笔记(持续更新,欢迎指正)
  9. MySQL——INSERT INTO
  10. mysql 军规_关于MySql的军规