Flink教程(20)- Flink高级特性(双流Join)
文章目录
- 01 引言
- 02 双流join介绍
- 03 Window Join
- 3.1 Tumbling Window Join
- 3.2 Sliding Window Join
- 3.3 Session Window Join
- 04 Interval Join
- 05 案例讲解
- 5.1 案例1
- 5.2 案例2
- 06 文末
01 引言
在前面的博客,我们学习了Flink
的BroadcastState
了,有兴趣的同学可以参阅下:
- 《Flink教程(01)- Flink知识图谱》
- 《Flink教程(02)- Flink入门》
- 《Flink教程(03)- Flink环境搭建》
- 《Flink教程(04)- Flink入门案例》
- 《Flink教程(05)- Flink原理简单分析》
- 《Flink教程(06)- Flink批流一体API(Source示例)》
- 《Flink教程(07)- Flink批流一体API(Transformation示例)》
- 《Flink教程(08)- Flink批流一体API(Sink示例)》
- 《Flink教程(09)- Flink批流一体API(Connectors示例)》
- 《Flink教程(10)- Flink批流一体API(其它)》
- 《Flink教程(11)- Flink高级API(Window)》
- 《Flink教程(12)- Flink高级API(Time与Watermaker)》
- 《Flink教程(13)- Flink高级API(状态管理)》
- 《Flink教程(14)- Flink高级API(容错机制)》
- 《Flink教程(15)- Flink高级API(并行度)》
- 《Flink教程(16)- Flink Table与SQL》
- 《Flink教程(17)- Flink Table与SQL(案例与SQL算子)》
- 《Flink教程(18)- Flink阶段总结》
- 《Flink教程(19)- Flink高级特性(BroadcastState)》
本文主要讲解Flink
的高级特性其中之一的双流Join
。
02 双流join介绍
Join大体分类只有两种:Window Join和Interval Join。
Window Join又可以根据Window的类型细分出3种:
- Tumbling Window Join
- Sliding Window Join
- Session Widnow Join
Windows类型的join都是利用window的机制,先将数据缓存在Window State中,当窗口触发计算时,执行join操作;
interval join也是利用state
存储数据再处理,区别在于state
中的数据有失效机制,依靠数据触发数据清理;目前Stream join
的结果是数据的笛卡尔积;
03 Window Join
3.1 Tumbling Window Join
执行翻滚窗口联接时,具有公共键和公共翻滚窗口的所有元素将作为成对组合联接,并传递给JoinFunction或FlatJoinFunction。因为它的行为类似于内部连接,所以一个流中的元素在其滚动窗口中没有来自另一个流的元素,因此不会被发射!
如图所示,我们定义了一个大小为2毫秒的翻滚窗口,结果窗口的形式为[0,1]、[2,3]、。。。。该图显示了每个窗口中所有元素的成对组合,这些元素将传递给JoinFunction。注意,在翻滚窗口[6,7]中没有发射任何东西,因为绿色流中不存在与橙色元素⑥和⑦结合的元素。
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows;
import org.apache.flink.streaming.api.windowing.time.Time;...
DataStream<Integer> orangeStream = ...DataStream<Integer> greenStream = ...
orangeStream.join(greenStream).where(<KeySelector>).equalTo(<KeySelector>).window(TumblingEventTimeWindows.of(Time.milliseconds(2))).apply (new JoinFunction<Integer, Integer, String> (){@Overridepublic String join(Integer first, Integer second) {return first + "," + second;}});
3.2 Sliding Window Join
在执行滑动窗口联接时,具有公共键和公共滑动窗口的所有元素将作为成对组合联接,并传递给JoinFunction或FlatJoinFunction。在当前滑动窗口中,一个流的元素没有来自另一个流的元素,则不会发射!请注意,某些元素可能会连接到一个滑动窗口中,但不会连接到另一个滑动窗口中!
在本例中,我们使用大小为2毫秒的滑动窗口,并将其滑动1毫秒,从而产生滑动窗口[-1,0],[0,1],[1,2],[2,3]…。x轴下方的连接元素是传递给每个滑动窗口的JoinFunction的元素。在这里,您还可以看到,例如,在窗口[2,3]中,橙色②与绿色③连接,但在窗口[1,2]中没有与任何对象连接。
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.streaming.api.windowing.assigners.SlidingEventTimeWindows;
import org.apache.flink.streaming.api.windowing.time.Time;
...
DataStream<Integer> orangeStream = ...DataStream<Integer> greenStream = ...
orangeStream.join(greenStream).where(<KeySelector>).equalTo(<KeySelector>).window(SlidingEventTimeWindows.of(Time.milliseconds(2) /* size */, Time.milliseconds(1) /* slide */)).apply (new JoinFunction<Integer, Integer, String> (){@Overridepublic String join(Integer first, Integer second) {return first + "," + second;}});
3.3 Session Window Join
在执行会话窗口联接时,具有相同键(当“组合”时满足会话条件)的所有元素以成对组合方式联接,并传递给JoinFunction或FlatJoinFunction。同样,这执行一个内部连接,所以如果有一个会话窗口只包含来自一个流的元素,则不会发出任何输出!
在这里,我们定义了一个会话窗口连接,其中每个会话被至少1ms的间隔分割。有三个会话,在前两个会话中,来自两个流的连接元素被传递给JoinFunction。在第三个会话中,绿色流中没有元素,所以⑧和⑨没有连接!
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.streaming.api.windowing.assigners.EventTimeSessionWindows;
import org.apache.flink.streaming.api.windowing.time.Time;...
DataStream<Integer> orangeStream = ...DataStream<Integer> greenStream = ...
orangeStream.join(greenStream).where(<KeySelector>).equalTo(<KeySelector>).window(EventTimeSessionWindows.withGap(Time.milliseconds(1))).apply (new JoinFunction<Integer, Integer, String> (){@Overridepublic String join(Integer first, Integer second) {return first + "," + second;}});
04 Interval Join
前面学习的Window Join必须要在一个Window中进行JOIN,那如果没有Window如何处理呢?
- interval join也是使用相同的key来join两个流(流A、流B),并且流B中的元素中的时间戳,和流A元素的时间戳,有一个时间间隔。
b.timestamp ∈ [a.timestamp + lowerBound; a.timestamp + upperBound]
or
a.timestamp + lowerBound <= b.timestamp <= a.timestamp + upperBound
也就是:流B的元素的时间戳 ≥ 流A的元素时间戳 + 下界,且,流B的元素的时间戳 ≤ 流A的元素时间戳 + 上界。
在上面的示例中,我们将两个流“orange”和“green”连接起来,其下限为-2毫秒,上限为+1毫秒。默认情况下,这些边界是包含的,但是可以应用.lowerBoundExclusive()和.upperBoundExclusive来更改行为
orangeElem.ts + lowerBound <= greenElem.ts <= orangeElem.ts + upperBound
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.streaming.api.functions.co.ProcessJoinFunction;
import org.apache.flink.streaming.api.windowing.time.Time;
...
DataStream<Integer> orangeStream = ...DataStream<Integer> greenStream = ...
orangeStream.keyBy(<KeySelector>).intervalJoin(greenStream.keyBy(<KeySelector>)).between(Time.milliseconds(-2), Time.milliseconds(1)).process (new ProcessJoinFunction<Integer, Integer, String(){@Overridepublic void processElement(Integer left, Integer right, Context ctx, Collector<String> out) {out.collect(first + "," + second);}});
05 案例讲解
5.1 案例1
需求:使用两个指定Source模拟数据,一个Source是订单明细,一个Source是商品数据。我们通过window join,将数据关联到一起。
思路:
- Window Join首先需要使用where和equalTo指定使用哪个key来进行关联,此处我们通过应用方法,基于GoodsId来关联两个流中的元素。
- 设置5秒的滚动窗口,流的元素关联都会在这个5秒的窗口中进行关联。
- apply方法中实现将两个不同类型的元素关联并生成一个新类型的元素。
示例代码:
/*** 双流join案例1** @author : YangLinWei* @createTime: 2022/3/8 11:17 下午*/
public class JoinDemo01 {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();// 构建商品数据流DataStream<Goods> goodsDS = env.addSource(new GoodsSource11(), TypeInformation.of(Goods.class)).assignTimestampsAndWatermarks(new GoodsWatermark());// 构建订单明细数据流DataStream<OrderItem> orderItemDS = env.addSource(new OrderItemSource(), TypeInformation.of(OrderItem.class)).assignTimestampsAndWatermarks(new OrderItemWatermark());// 进行关联查询DataStream<FactOrderItem> factOrderItemDS = orderItemDS.join(goodsDS)// 第一个流orderItemDS.where(OrderItem::getGoodsId)// 第二流goodsDS.equalTo(Goods::getGoodsId).window(TumblingEventTimeWindows.of(Time.seconds(5))).apply((OrderItem item, Goods goods) -> {FactOrderItem factOrderItem = new FactOrderItem();factOrderItem.setGoodsId(goods.getGoodsId());factOrderItem.setGoodsName(goods.getGoodsName());factOrderItem.setCount(new BigDecimal(item.getCount()));factOrderItem.setTotalMoney(goods.getGoodsPrice().multiply(new BigDecimal(item.getCount())));return factOrderItem;});factOrderItemDS.print();env.execute("滚动窗口JOIN");}//商品类@Datapublic static class Goods {private String goodsId;private String goodsName;private BigDecimal goodsPrice;public static List<Goods> GOODS_LIST;public static Random r;static {r = new Random();GOODS_LIST = new ArrayList<>();GOODS_LIST.add(new Goods("1", "小米12", new BigDecimal(4890)));GOODS_LIST.add(new Goods("2", "iphone12", new BigDecimal(12000)));GOODS_LIST.add(new Goods("3", "MacBookPro", new BigDecimal(15000)));GOODS_LIST.add(new Goods("4", "Thinkpad X1", new BigDecimal(9800)));GOODS_LIST.add(new Goods("5", "MeiZu One", new BigDecimal(3200)));GOODS_LIST.add(new Goods("6", "Mate 40", new BigDecimal(6500)));}public static Goods randomGoods() {int rIndex = r.nextInt(GOODS_LIST.size());return GOODS_LIST.get(rIndex);}public Goods() {}public Goods(String goodsId, String goodsName, BigDecimal goodsPrice) {this.goodsId = goodsId;this.goodsName = goodsName;this.goodsPrice = goodsPrice;}@Overridepublic String toString() {return JSON.toJSONString(this);}}//订单明细类@Datapublic static class OrderItem {private String itemId;private String goodsId;private Integer count;@Overridepublic String toString() {return JSON.toJSONString(this);}}//关联结果@Datapublic static class FactOrderItem {private String goodsId;private String goodsName;private BigDecimal count;private BigDecimal totalMoney;@Overridepublic String toString() {return JSON.toJSONString(this);}}//构建一个商品Stream源(这个好比就是维表)public static class GoodsSource11 extends RichSourceFunction {private Boolean isCancel;@Overridepublic void open(Configuration parameters) throws Exception {isCancel = false;}@Overridepublic void run(SourceContext sourceContext) throws Exception {while (!isCancel) {Goods.GOODS_LIST.stream().forEach(goods -> sourceContext.collect(goods));TimeUnit.SECONDS.sleep(1);}}@Overridepublic void cancel() {isCancel = true;}}//构建订单明细Stream源public static class OrderItemSource extends RichSourceFunction {private Boolean isCancel;private Random r;@Overridepublic void open(Configuration parameters) throws Exception {isCancel = false;r = new Random();}@Overridepublic void run(SourceContext sourceContext) throws Exception {while (!isCancel) {Goods goods = Goods.randomGoods();OrderItem orderItem = new OrderItem();orderItem.setGoodsId(goods.getGoodsId());orderItem.setCount(r.nextInt(10) + 1);orderItem.setItemId(UUID.randomUUID().toString());sourceContext.collect(orderItem);orderItem.setGoodsId("111");sourceContext.collect(orderItem);TimeUnit.SECONDS.sleep(1);}}@Overridepublic void cancel() {isCancel = true;}}//构建水印分配器(此处为了简单),直接使用系统时间了public static class GoodsWatermark implements WatermarkStrategy<Goods> {@Overridepublic TimestampAssigner<Goods> createTimestampAssigner(TimestampAssignerSupplier.Context context) {return (element, recordTimestamp) -> System.currentTimeMillis();}@Overridepublic WatermarkGenerator<Goods> createWatermarkGenerator(WatermarkGeneratorSupplier.Context context) {return new WatermarkGenerator<Goods>() {@Overridepublic void onEvent(Goods event, long eventTimestamp, WatermarkOutput output) {output.emitWatermark(new Watermark(System.currentTimeMillis()));}@Overridepublic void onPeriodicEmit(WatermarkOutput output) {output.emitWatermark(new Watermark(System.currentTimeMillis()));}};}}public static class OrderItemWatermark implements WatermarkStrategy<OrderItem> {@Overridepublic TimestampAssigner<OrderItem> createTimestampAssigner(TimestampAssignerSupplier.Context context) {return (element, recordTimestamp) -> System.currentTimeMillis();}@Overridepublic WatermarkGenerator<OrderItem> createWatermarkGenerator(WatermarkGeneratorSupplier.Context context) {return new WatermarkGenerator<OrderItem>() {@Overridepublic void onEvent(OrderItem event, long eventTimestamp, WatermarkOutput output) {output.emitWatermark(new Watermark(System.currentTimeMillis()));}@Overridepublic void onPeriodicEmit(WatermarkOutput output) {output.emitWatermark(new Watermark(System.currentTimeMillis()));}};}}
}
5.2 案例2
需求:
- 通过
keyBy
将两个流join
到一起 - interval join需要设置流A去关联哪个时间范围的流B中的元素。此处,我设置的下界为-1、上界为0,且上界是一个开区间。表达的意思就是流A中某个元素的时间,对应上一秒的流B中的元素。
- process中将两个key一样的元素,关联在一起,并加载到一个新的FactOrderItem对象中
/*** 双流join案例2** @author : YangLinWei* @createTime: 2022/3/8 11:20 下午*/
public class JoinDemo02 {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();// 构建商品数据流DataStream<Goods> goodsDS = env.addSource(new GoodsSource11(), TypeInformation.of(Goods.class)).assignTimestampsAndWatermarks(new GoodsWatermark());// 构建订单明细数据流DataStream<OrderItem> orderItemDS = env.addSource(new OrderItemSource(), TypeInformation.of(OrderItem.class)).assignTimestampsAndWatermarks(new OrderItemWatermark());// 进行关联查询SingleOutputStreamOperator<FactOrderItem> factOrderItemDS = orderItemDS.keyBy(item -> item.getGoodsId()).intervalJoin(goodsDS.keyBy(goods -> goods.getGoodsId())).between(Time.seconds(-1), Time.seconds(0)).upperBoundExclusive().process(new ProcessJoinFunction<OrderItem, Goods, FactOrderItem>() {@Overridepublic void processElement(OrderItem left, Goods right, Context ctx, Collector<FactOrderItem> out) throws Exception {FactOrderItem factOrderItem = new FactOrderItem();factOrderItem.setGoodsId(right.getGoodsId());factOrderItem.setGoodsName(right.getGoodsName());factOrderItem.setCount(new BigDecimal(left.getCount()));factOrderItem.setTotalMoney(right.getGoodsPrice().multiply(new BigDecimal(left.getCount())));out.collect(factOrderItem);}});factOrderItemDS.print();env.execute("Interval JOIN");}//商品类@Datapublic static class Goods {private String goodsId;private String goodsName;private BigDecimal goodsPrice;public static List<Goods> GOODS_LIST;public static Random r;static {r = new Random();GOODS_LIST = new ArrayList<>();GOODS_LIST.add(new Goods("1", "小米12", new BigDecimal(4890)));GOODS_LIST.add(new Goods("2", "iphone12", new BigDecimal(12000)));GOODS_LIST.add(new Goods("3", "MacBookPro", new BigDecimal(15000)));GOODS_LIST.add(new Goods("4", "Thinkpad X1", new BigDecimal(9800)));GOODS_LIST.add(new Goods("5", "MeiZu One", new BigDecimal(3200)));GOODS_LIST.add(new Goods("6", "Mate 40", new BigDecimal(6500)));}public static Goods randomGoods() {int rIndex = r.nextInt(GOODS_LIST.size());return GOODS_LIST.get(rIndex);}public Goods() {}public Goods(String goodsId, String goodsName, BigDecimal goodsPrice) {this.goodsId = goodsId;this.goodsName = goodsName;this.goodsPrice = goodsPrice;}@Overridepublic String toString() {return JSON.toJSONString(this);}}//订单明细类@Datapublic static class OrderItem {private String itemId;private String goodsId;private Integer count;@Overridepublic String toString() {return JSON.toJSONString(this);}}//关联结果@Datapublic static class FactOrderItem {private String goodsId;private String goodsName;private BigDecimal count;private BigDecimal totalMoney;@Overridepublic String toString() {return JSON.toJSONString(this);}}//构建一个商品Stream源(这个好比就是维表)public static class GoodsSource11 extends RichSourceFunction {private Boolean isCancel;@Overridepublic void open(Configuration parameters) throws Exception {isCancel = false;}@Overridepublic void run(SourceContext sourceContext) throws Exception {while (!isCancel) {Goods.GOODS_LIST.stream().forEach(goods -> sourceContext.collect(goods));TimeUnit.SECONDS.sleep(1);}}@Overridepublic void cancel() {isCancel = true;}}//构建订单明细Stream源public static class OrderItemSource extends RichSourceFunction {private Boolean isCancel;private Random r;@Overridepublic void open(Configuration parameters) throws Exception {isCancel = false;r = new Random();}@Overridepublic void run(SourceContext sourceContext) throws Exception {while (!isCancel) {Goods goods = Goods.randomGoods();OrderItem orderItem = new OrderItem();orderItem.setGoodsId(goods.getGoodsId());orderItem.setCount(r.nextInt(10) + 1);orderItem.setItemId(UUID.randomUUID().toString());sourceContext.collect(orderItem);orderItem.setGoodsId("111");sourceContext.collect(orderItem);TimeUnit.SECONDS.sleep(1);}}@Overridepublic void cancel() {isCancel = true;}}//构建水印分配器(此处为了简单),直接使用系统时间了public static class GoodsWatermark implements WatermarkStrategy<Goods> {@Overridepublic TimestampAssigner<Goods> createTimestampAssigner(TimestampAssignerSupplier.Context context) {return (element, recordTimestamp) -> System.currentTimeMillis();}@Overridepublic WatermarkGenerator<Goods> createWatermarkGenerator(WatermarkGeneratorSupplier.Context context) {return new WatermarkGenerator<Goods>() {@Overridepublic void onEvent(Goods event, long eventTimestamp, WatermarkOutput output) {output.emitWatermark(new Watermark(System.currentTimeMillis()));}@Overridepublic void onPeriodicEmit(WatermarkOutput output) {output.emitWatermark(new Watermark(System.currentTimeMillis()));}};}}public static class OrderItemWatermark implements WatermarkStrategy<OrderItem> {@Overridepublic TimestampAssigner<OrderItem> createTimestampAssigner(TimestampAssignerSupplier.Context context) {return (element, recordTimestamp) -> System.currentTimeMillis();}@Overridepublic WatermarkGenerator<OrderItem> createWatermarkGenerator(WatermarkGeneratorSupplier.Context context) {return new WatermarkGenerator<OrderItem>() {@Overridepublic void onEvent(OrderItem event, long eventTimestamp, WatermarkOutput output) {output.emitWatermark(new Watermark(System.currentTimeMillis()));}@Overridepublic void onPeriodicEmit(WatermarkOutput output) {output.emitWatermark(new Watermark(System.currentTimeMillis()));}};}}
}
06 文末
本文主要讲解了Flink双流join的高级特性,谢谢大家的阅读本文完!
Flink教程(20)- Flink高级特性(双流Join)相关推荐
- python是人都能学会_人人都能学会的python编程教程15:高级特性2
生成器 如果你想要一百万个数,而这些数里只有一百个数是你经常要用的,剩下的都几乎不怎么会用到,那么如果直接把这一百万个数全部放在list中是不明智的因为这会浪费较多存储空间,生成器就是为了解决这个问题 ...
- python 廖雪峰_廖雪峰的Python系列教程(20)——高级特性之生成器
生成器 通过列表生成式,我们可以直接创建一个列表.但是,受到内存限制,列表容量肯定是有限的.而且,创建一个包含100万个元素的列表,不仅占用很大的存储空间,如果我们仅仅需要访问前面几个元素,那后面绝大 ...
- flink教程-聊聊 flink 1.11 中新的水印策略
文章目录 背景 新的水印生成接口 内置水印生成策略 固定延迟生成水印 单调递增生成水印 event时间的获取 处理空闲数据源 背景 在flink 1.11之前的版本中,提供了两种生成水印(Waterm ...
- 人人都能学会的python编程教程15:高级特性2
生成器 如果你想要一百万个数,而这些数里只有一百个数是你经常要用的,剩下的都几乎不怎么会用到,那么如果直接把这一百万个数全部放在list中是不明智的因为这会浪费较多存储空间,生成器就是为了解决这个问题 ...
- 人人都能学会的python编程教程14:高级特性1
切片 取一个list或tuple的部分元素是非常常见的操作.比如,一个list如下: L = ['Michael', 'Sarah', 'Tracy', 'Bob', 'Jack'] 第一个元素的索引 ...
- Flink教程(22)- Flink高级特性(异步IO)
文章目录 01 引言 02 异步IO 2.1 异步IO介绍 2.2 使用Aysnc I/O的前提条件 2.3 Async I/O API 03 案例演示 04 原理深入 4.1 AsyncDataSt ...
- Flink教程(25)- Flink高级特性(FlinkSQL整合Hive)
文章目录 01 引言 02 FlinkSQL 整合Hive 2.1 介绍 2.2 集成Hive的基本方式 2.3 准备工作 2.4 SQL CLI 2.5 代码演示 03 文末 01 引言 在前面的博 ...
- Flink教程(24)- Flink高级特性(File Sink)
文章目录 01 引言 02 File Sink介绍 03 File Sink案例演示 04 文末 01 引言 在前面的博客,我们学习了Flink的Streaming File Sink了,有兴趣的同学 ...
- Flink教程(19)- Flink高级特性(BroadcastState)
文章目录 01 引言 02 BroadcastState介绍 03 BroadcastState API介绍 04 BroadcastState 案例 4.1 需求 4.2 编码步骤 4.3 编码实现 ...
最新文章
- centos 7 php mysql apache_CentOS 7 搭建 Apache+MySQL+PHP
- mysql象限和投影_Camera类之orthographic-摄像机投影模式(第100篇随笔)
- VB:使用Visual Studio 2010中的VB语言工具箱DataGridView调用SQL数据库Database的表格文件
- Implementing Synchronization Operations
- 简述什么是图灵机_什么是图灵机
- ubantu 黑屏_死机黑屏专题上线啦,早鸟只要299,看完薪水翻一番
- this和super关键字
- 冒泡排序解析 + 代码实现(C语言)
- hysys动态模拟教程_(转载)HYSYS-过程模拟软件-稳态模拟-第一部分(一)
- 科三——细则以及扣分点
- ps4如何设置虚拟服务器,PS5官方使用指南 账号设置/PS4传输数据/调整主机设定
- 【Cocos Creator实战教程(6)】——镜头跟随
- nice,​使用python生成专属二维码~
- 电脑没声音,喇叭上一个叉,显示无法找到输入输出设备(录制,耳机等等)
- LTE 终端如何申请 RB 资源以及现实面临的问题
- word 无法复制粘贴怎么办
- 不要让回忆有负疚感——职业规划法则一
- Google谷歌权重09年算法
- html静态页面作业——酷酷动物主题响应式网页(5页) 大学生动物主题网页作品 动物网页设计作业模板 学生网页制作源代码下载
- mybatisPlus 实体类与数据库表映射关系