概述

  • Flink具有Table API和SQL-用于统一流和批处理。
  • Table API是用于Scala和Java的语言集成查询API,它允许以非常直观的方式组合来自关系运算符(例如选择,过滤和联接)的查询。
  • Flink的SQL支持基于实现SQL标准的Apache Calcite。无论输入是批处理输入(DataSet)还是流输入(DataStream),在两个接口中指定的查询都具有相同的语义并指定相同的结果。
    Table API和SQL尚未完成所有功能,正在积极开发中,支持程度需查看 官方文档

使用

多表连接案例

pom依赖

flink 版本为:1.9.3

<dependencies><!-- Apache Flink dependencies --><!-- These dependencies are provided, because they should not be packaged into the JAR file. --><dependency><groupId>org.apache.flink</groupId><artifactId>flink-java</artifactId><version>${flink.version}</version><scope>provided</scope></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-streaming-java_${scala.binary.version}</artifactId><version>${flink.version}</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-table-api-java-bridge_2.11</artifactId><version>${flink.version}</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-table-planner-blink_2.11</artifactId><version>${flink.version}</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-table-api-java</artifactId><version>${flink.version}</version></dependency>

模拟一个实时流

public class Item {public String getName() {return name;}public void setName(String name) {this.name = name;}public Integer getId() {return id;}public void setId(Integer id) {this.id = id;}public String name;public Integer id;public Item() {}@Overridepublic String toString() {return "Item{" +"name='" + name + '\'' +", id=" + id +'}';}

自定义Source


import common.Item;
import org.apache.flink.streaming.api.functions.source.SourceFunction;import java.util.ArrayList;
import java.util.Random;public class MyStreamingSource implements SourceFunction<Item> {private boolean isRunning = true;@Overridepublic void run(SourceContext<Item> ctx) throws Exception {while (isRunning){Item item = generateItem();ctx.collect(item);Thread.sleep(1000);}}/*** 随机产生一条记录** @return*/private Item generateItem(){int i = new Random().nextInt(100);ArrayList<String> list = new ArrayList();list.add("HAT");list.add("TIE");list.add("SHOE");Item item = new Item();item.setName(list.get(new Random().nextInt(3)));item.setId(i);return item;}@Overridepublic void cancel() {}
}

主程序

public class TableStremingDemo {public static void main(String[] args) throws Exception {StreamExecutionEnvironment bsEnv = StreamExecutionEnvironment.getExecutionEnvironment();// 使用BlinkEnvironmentSettings bsSettings = EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build();StreamTableEnvironment bsTableEnv = StreamTableEnvironment.create(bsEnv, bsSettings);SingleOutputStreamOperator<Item> source = bsEnv.addSource(new MyStremingSource()).map(new MapFunction<Item, Item>() {@Overridepublic Item map(Item value) throws Exception {return value;}});// 分割流final OutputTag<Item> even = new OutputTag<Item>("even") {};final OutputTag<Item> old = new OutputTag<Item>("old") {};SingleOutputStreamOperator<Item> sideOutputData = source.process(new ProcessFunction<Item, Item>() {@Overridepublic void processElement(Item value, Context ctx, Collector<Item> out) throws Exception {if (value.getId() % 2 == 0) {ctx.output(even,value);}else{ctx.output(old,value);}}});DataStream<Item> evenStream = sideOutputData.getSideOutput(even);DataStream<Item> oldStream = sideOutputData.getSideOutput(old);// 注册两个 表 : evenTable,oddTablebsTableEnv.registerDataStream("evenTable",evenStream , "name,id");bsTableEnv.registerDataStream("oddTable", oldStream, "name,id");// 执行sql 输出TableTable queryTable = bsTableEnv.sqlQuery("select a.id,a.name,b.id,b.name from evenTable as a join oddTable as b on a.name = b.name");queryTable.printSchema();;// 获取流DataStream<Tuple2<Boolean, Tuple4<Integer, String, Integer, String>>> dataStream = bsTableEnv.toRetractStream(queryTable, TypeInformation.of(new TypeHint<Tuple4<Integer,String,Integer,String>>(){}));dataStream.print();bsEnv.execute("demo");}
}

结果打印


输出name相同的元素。

总结

简单的介绍了Flink Table Api & SQL和实现了两表连接的示例。

更多文章:www.ipooli.com

扫码关注公众号《ipoo》

Flink Table Api SQL 初体验,Blink的使用相关推荐

  1. 2021年大数据Flink(三十):Flink ​​​​​​​Table API  SQL 介绍

    目录 ​​​​​​​Table API & SQL 介绍 为什么需要Table API & SQL ​​​​​​​Table API& SQL发展历程 架构升级 查询处理器的选 ...

  2. (十八)Flink Table API SQL 编程指南 Table API 和Datastream API 集成

    文章目录 DataStream 和 Table 之间的转换 依赖项和导入 配置 执行行为 datastream API table API 批处理运行时模式 Changelog统一 处理(仅插入)流 ...

  3. Flink Table API SQL编程指南(自定义Sources Sinks)

    TableSource TableSource提供对存储在外部系统(数据库,键值存储,消息队列)或文件中的数据的访问的通用接口.在TableEnvironment中注册TableSource后,可以通 ...

  4. Flink Table和SQL的基本API

    文章目录 一个示例 程序架构 创建表环境 创建表 1.连接器 2.虚拟表 表的查询 1.执行SQL查询 2.调用Table API进行查询 3.两种API的结合使用 输出表 表和流的转换 1.将表转换 ...

  5. flink Table API 与SQL入门实战

    流处理和批处理都可以用,是非常的方便! 导入依赖 <dependency><groupId>org.apache.flink</groupId><artifa ...

  6. Flink Table API和SQL(下)

    传送门: Flink Table API和SQL(上)(基本API介绍+流处理表的特性) Flink Table API和SQL(中)(时间属性及窗口+聚合查询+联结查询) Flink Table A ...

  7. 1.18.Table API SQL(概念、依赖图、Table程序依赖、扩展依赖)

    1.18.Table API & SQL 1.18.1.概念 1.18.1.1.依赖图 1.18.1.2.Table程序依赖 1.18.1.3.扩展依赖 1.18.Table API & ...

  8. Flink教程(16)- Flink Table与SQL

    文章目录 01 引言 02 Table API & SQL 介绍 2.1 Flink Table模块 2.2 Table API & SQL特点 2.3 Table API& ...

  9. java.lang.NoSuchMethodError: org.apache.flink.table.api.TableColumn.isGenerated()Z

    完整报错如下: select * from dim_behavior; [ERROR] Could not execute SQL statement. Reason: java.lang.NoSuc ...

最新文章

  1. c语言既适合于开发,C语言试题及答案 (1)
  2. stdout标准输出、stderr标准错误输出 标准输入、标准输出、标准错误输出分别被定义为0、1、2。
  3. [转载]一句话插配置文件
  4. 安装了Node.js 从VScode 使用node -v 和 npm -v等命令却无效
  5. 《ArcGIS Runtime SDK for Android开发笔记》——(15)、要素绘制Drawtools3.0工具DEMO
  6. Linux文件系统变成只读的解决方法
  7. python音频实时频谱分析_基于python的音频设计及频谱分析
  8. Install-Package Ninject -Project SportsStore.WebUI
  9. 【Alpha】Scrum Meeting 2
  10. JAVA Eclipse如何设置点击按钮切换图片
  11. 计算机的发展导致了计算思维的诞生,尔雅电子计算机的诞生(上)
  12. vue baidu-map百度地图自定义换肤、去掉百度地图logo
  13. 『C++』endl、ends和flush的区别
  14. vue使用provide / inject 组合刷新页面+单独组件刷新
  15. macbook air恢复出厂设置
  16. 天玑9200和骁龙8+哪个好 天玑9200和骁龙8+gen1对比
  17. 信任=自信+他信+信他
  18. PowerDesigner 修改 DBMS
  19. Photoshop CS2 视频教程-PS锁定图层(转)
  20. 计算服务——弹性云服务器

热门文章

  1. OpenCV_basis
  2. 华为od统一考试B卷【按身高体重排队】C语言 实现
  3. 微信JS-SDK录音的speex音频文件转换为wav
  4. 重返帝国T0阵容搭配
  5. 《如何让你爱的人爱上你》第四部分:等价原则
  6. 期货CTP接口C++源码与C#应用程序的对接
  7. POJ 1417 True Liars 带权并查集 + 背包
  8. python将txt坐标批量打印到原图上
  9. 2019二级建造师-法规-基础班-精讲班课程更新进度
  10. SAP 批量修改或添加BOM组件