Flink Table Api SQL 初体验,Blink的使用
概述
- Flink具有Table API和SQL-用于统一流和批处理。
- Table API是用于Scala和Java的语言集成查询API,它允许以非常直观的方式组合来自关系运算符(例如选择,过滤和联接)的查询。
- Flink的SQL支持基于实现SQL标准的Apache Calcite。无论输入是批处理输入(DataSet)还是流输入(DataStream),在两个接口中指定的查询都具有相同的语义并指定相同的结果。
Table API和SQL尚未完成所有功能,正在积极开发中,支持程度需查看 官方文档
使用
多表连接案例
pom依赖
flink 版本为:1.9.3
<dependencies><!-- Apache Flink dependencies --><!-- These dependencies are provided, because they should not be packaged into the JAR file. --><dependency><groupId>org.apache.flink</groupId><artifactId>flink-java</artifactId><version>${flink.version}</version><scope>provided</scope></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-streaming-java_${scala.binary.version}</artifactId><version>${flink.version}</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-table-api-java-bridge_2.11</artifactId><version>${flink.version}</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-table-planner-blink_2.11</artifactId><version>${flink.version}</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-table-api-java</artifactId><version>${flink.version}</version></dependency>
模拟一个实时流
public class Item {public String getName() {return name;}public void setName(String name) {this.name = name;}public Integer getId() {return id;}public void setId(Integer id) {this.id = id;}public String name;public Integer id;public Item() {}@Overridepublic String toString() {return "Item{" +"name='" + name + '\'' +", id=" + id +'}';}
自定义Source
import common.Item;
import org.apache.flink.streaming.api.functions.source.SourceFunction;import java.util.ArrayList;
import java.util.Random;public class MyStreamingSource implements SourceFunction<Item> {private boolean isRunning = true;@Overridepublic void run(SourceContext<Item> ctx) throws Exception {while (isRunning){Item item = generateItem();ctx.collect(item);Thread.sleep(1000);}}/*** 随机产生一条记录** @return*/private Item generateItem(){int i = new Random().nextInt(100);ArrayList<String> list = new ArrayList();list.add("HAT");list.add("TIE");list.add("SHOE");Item item = new Item();item.setName(list.get(new Random().nextInt(3)));item.setId(i);return item;}@Overridepublic void cancel() {}
}
主程序
public class TableStremingDemo {public static void main(String[] args) throws Exception {StreamExecutionEnvironment bsEnv = StreamExecutionEnvironment.getExecutionEnvironment();// 使用BlinkEnvironmentSettings bsSettings = EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build();StreamTableEnvironment bsTableEnv = StreamTableEnvironment.create(bsEnv, bsSettings);SingleOutputStreamOperator<Item> source = bsEnv.addSource(new MyStremingSource()).map(new MapFunction<Item, Item>() {@Overridepublic Item map(Item value) throws Exception {return value;}});// 分割流final OutputTag<Item> even = new OutputTag<Item>("even") {};final OutputTag<Item> old = new OutputTag<Item>("old") {};SingleOutputStreamOperator<Item> sideOutputData = source.process(new ProcessFunction<Item, Item>() {@Overridepublic void processElement(Item value, Context ctx, Collector<Item> out) throws Exception {if (value.getId() % 2 == 0) {ctx.output(even,value);}else{ctx.output(old,value);}}});DataStream<Item> evenStream = sideOutputData.getSideOutput(even);DataStream<Item> oldStream = sideOutputData.getSideOutput(old);// 注册两个 表 : evenTable,oddTablebsTableEnv.registerDataStream("evenTable",evenStream , "name,id");bsTableEnv.registerDataStream("oddTable", oldStream, "name,id");// 执行sql 输出TableTable queryTable = bsTableEnv.sqlQuery("select a.id,a.name,b.id,b.name from evenTable as a join oddTable as b on a.name = b.name");queryTable.printSchema();;// 获取流DataStream<Tuple2<Boolean, Tuple4<Integer, String, Integer, String>>> dataStream = bsTableEnv.toRetractStream(queryTable, TypeInformation.of(new TypeHint<Tuple4<Integer,String,Integer,String>>(){}));dataStream.print();bsEnv.execute("demo");}
}
结果打印
输出name相同的元素。
总结
简单的介绍了Flink Table Api & SQL和实现了两表连接的示例。
更多文章:www.ipooli.com
扫码关注公众号《ipoo》
Flink Table Api SQL 初体验,Blink的使用相关推荐
- 2021年大数据Flink(三十):Flink Table API SQL 介绍
目录 Table API & SQL 介绍 为什么需要Table API & SQL Table API& SQL发展历程 架构升级 查询处理器的选 ...
- (十八)Flink Table API SQL 编程指南 Table API 和Datastream API 集成
文章目录 DataStream 和 Table 之间的转换 依赖项和导入 配置 执行行为 datastream API table API 批处理运行时模式 Changelog统一 处理(仅插入)流 ...
- Flink Table API SQL编程指南(自定义Sources Sinks)
TableSource TableSource提供对存储在外部系统(数据库,键值存储,消息队列)或文件中的数据的访问的通用接口.在TableEnvironment中注册TableSource后,可以通 ...
- Flink Table和SQL的基本API
文章目录 一个示例 程序架构 创建表环境 创建表 1.连接器 2.虚拟表 表的查询 1.执行SQL查询 2.调用Table API进行查询 3.两种API的结合使用 输出表 表和流的转换 1.将表转换 ...
- flink Table API 与SQL入门实战
流处理和批处理都可以用,是非常的方便! 导入依赖 <dependency><groupId>org.apache.flink</groupId><artifa ...
- Flink Table API和SQL(下)
传送门: Flink Table API和SQL(上)(基本API介绍+流处理表的特性) Flink Table API和SQL(中)(时间属性及窗口+聚合查询+联结查询) Flink Table A ...
- 1.18.Table API SQL(概念、依赖图、Table程序依赖、扩展依赖)
1.18.Table API & SQL 1.18.1.概念 1.18.1.1.依赖图 1.18.1.2.Table程序依赖 1.18.1.3.扩展依赖 1.18.Table API & ...
- Flink教程(16)- Flink Table与SQL
文章目录 01 引言 02 Table API & SQL 介绍 2.1 Flink Table模块 2.2 Table API & SQL特点 2.3 Table API& ...
- java.lang.NoSuchMethodError: org.apache.flink.table.api.TableColumn.isGenerated()Z
完整报错如下: select * from dim_behavior; [ERROR] Could not execute SQL statement. Reason: java.lang.NoSuc ...
最新文章
- c语言既适合于开发,C语言试题及答案 (1)
- stdout标准输出、stderr标准错误输出 标准输入、标准输出、标准错误输出分别被定义为0、1、2。
- [转载]一句话插配置文件
- 安装了Node.js 从VScode 使用node -v 和 npm -v等命令却无效
- 《ArcGIS Runtime SDK for Android开发笔记》——(15)、要素绘制Drawtools3.0工具DEMO
- Linux文件系统变成只读的解决方法
- python音频实时频谱分析_基于python的音频设计及频谱分析
- Install-Package Ninject -Project SportsStore.WebUI
- 【Alpha】Scrum Meeting 2
- JAVA Eclipse如何设置点击按钮切换图片
- 计算机的发展导致了计算思维的诞生,尔雅电子计算机的诞生(上)
- vue baidu-map百度地图自定义换肤、去掉百度地图logo
- 『C++』endl、ends和flush的区别
- vue使用provide / inject 组合刷新页面+单独组件刷新
- macbook air恢复出厂设置
- 天玑9200和骁龙8+哪个好 天玑9200和骁龙8+gen1对比
- 信任=自信+他信+信他
- PowerDesigner 修改 DBMS
- Photoshop CS2 视频教程-PS锁定图层(转)
- 计算服务——弹性云服务器