1.概述

1.1背景

这个实验特性应该是在Flink 1.5版本已经引进,但是直到现在(1.11)仍然是实验特性。官网对于它的描述 :这个特性仍然在不断的优化,目前是可能是不稳定、不兼容的,并且在以后的版本甚至发生大的改变。

1.2 作用

将DataStream重新解释为KeyedStream,这种方式可以避免shuffle

那么自然它的使用也会受到相应的约束,这个只能去重新解释那些已经预分区的DataStream。

1.3 官网例子

在源码中找到了这样一个测试代码,结果是:Tests passed

public class ReinterpretAsKeyedStreamDemo {public void reinterpretAsKeyedStream() throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(1);DataStreamSource<Integer> source = env.fromElements(1, 2, 3, 1, 2, 3, 1, 2, 3);KeyedStream<Integer, Integer> reinterpret = DataStreamUtils.reinterpretAsKeyedStream(source, new KeySelector<Integer, Integer>() {@Overridepublic Integer getKey(Integer value) throws Exception {return value;}});SingleOutputStreamOperator<Integer> reducer = reinterpret.countWindow(2).reduce(new ReduceFunction<Integer>() {@Overridepublic Integer reduce(Integer value1, Integer value2) throws Exception {return value1 + value2;}});reducer.addSink(new PrintSinkFunction<>());env.execute("xx");}}

上面结果我们可以看到 输出了2 4 6 其实就是

1 + 1 = 2 2 + 2 = 43 + 3 = 6

但是我们其实有9条数据,1,2,3分别是3组数据,为什么少输出呢?

因为前面两组1,2,3已经结束了一个窗口,满足同一个key下有两个数据,然后最后一组的1,2,3,并不满足有两个数据,无法触发窗口。

为了方便理解我们再次修改如下代码:

 public void reinterpretAsKeyedStream() throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(1);DataStreamSource<Integer> source = env.fromElements(1, 2, 3, 1, 2, 3, 1, 2, 3,2);KeyedStream<Integer, Integer> reinterpret = DataStreamUtils.reinterpretAsKeyedStream(source, new KeySelector<Integer, Integer>() {@Overridepublic Integer getKey(Integer value) throws Exception {return value;}});SingleOutputStreamOperator<Integer> reducer = reinterpret.countWindow(2).reduce(new ReduceFunction<Integer>() {@Overridepublic Integer reduce(Integer value1, Integer value2) throws Exception {return value1 + value2;}});reducer.addSink(new PrintSinkFunction<>());env.execute("xx");}

运行结果如下

2
4
6
4

我们数据源数据里面最后加入了一个2,然后最后输出多了一个4。当然这个是countwindow的使用,因为官网例子给的不明确,这里只是简单给大家补充一下,便于理解,避免初次使用产生太多疑问。

1.3.1 实战Demo分析

代码功能:

  1. 从文件中读取数据然后构建ds1:DataStream[Event]流,然后输出文件数据;

  2. 接着ds1流会根据Event的字段 key 进行keyby操作,使用一个窗口大小为2的CountWindow,然后保留这两条数据中 partition 字段值最大的一条数据,构建数据流ds2:DataStreamp[Event]

  3. 最后我们继续使用一个窗口大小为2的CountWindow,然后对窗口内两条数据处理:

    • 如果两条数据的event_type字段值不等,那么我们使用第一条数据的值去创建一个Event对象,然后新数据Event对象的event_type字段设置为3,并且把字段 v 设置为两个数据的字段 v 的字符串拼接;

    • 如果event_type字段值相等,那么我们保留time_字段值大的一条数据。

第三步中,正常情况我们会对ds2进行keyby然后继续按照key 字段值hash,这样会产生相应的Shuffle,但是通过使用本文的实验特性reinterpretAsKeyedStream,可以避免Shuffle。


// scala
object SessionwindowingOriginal {// 主函数def main(args: Array[String]): Unit = {Logger.getRootLogger.setLevel(Level.WARN)val params = ParameterTool.fromArgs(args)val env = StreamExecutionEnvironment.createLocalEnvironment(2)env.getConfig.setGlobalJobParameters(params)env.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime)env.setParallelism(2)env.setMaxParallelism(2)// 从文件读取数据 ds1:DataStream[Event]类型val ds1 = env.readTextFile("/Users/hehuiyuan/gitwarehouse/flinksql/src/main/resources/f1").map(e => {val l = e.split(",")val (key, time_, event_type, v, partition) = (l(0).trim, l(1).trim.toLong, l(2).trim.toInt, l(3).trim, l(4).trim)Event(key, time_, event_type, v, partition)}).name("f1_source")//输出原始数据ds1.addSink(new SinkFunction[Event] {override def invoke(value: Event, context: SinkFunction.Context[_]): Unit = System.out.println("原始数据:"+value.toString)}).name("origin_data_sink")//按照event对象的key字段分组// 相同key下 窗口大小数据量是2,然后取partition字段取最大的数据val ds2 = ds1.keyBy(_.key).countWindow(2).max("partition")// 输出ds2:DataStream[Event]ds2.addSink(new SinkFunction[Event] {override def invoke(value: Event, context: SinkFunction.Context[_]): Unit = System.out.println("ds2:"+value.toString)}).name("ds2")//ds2是DataSteam,ds1按照字段key分区处理后得到的流//此时还想继续使用KededStream的一些操作,需要把ds2进行keyby// 但是会存在shuffle,key不变情况下,我们可以直接把DataStream变为KeyedStreamval aggregated = new DataStreamUtils(ds2).reinterpretAsKeyedStream((event) => event.key).countWindow(2).reduce((e1, e2) =>if(e1.event_type != e2.event_type)Event(e1.key,e1.time_,3,e2.v+"_"+e1.v,e1.partition)else if(e2.time_ > e1.time_) e2else e1).addSink(new SinkFunction[Event] {override def invoke(value: Event, context: SinkFunction.Context[_]): Unit =System.out.println(value.toString)}).name("result")env.execute()}}

我们看一下读取的文件中的数据样式:

每一行都会被封装到一个Event对象中,然后构成DataStream。


//创建一个Pojo类,4个字段
case class Event(key: String,time_ : Long,event_type: Int,v: String,partition: String)

Event对象有五个字段,会使用逗号分割


输出结果:


原始数据:Event(a,1,1,banana,0)
原始数据:Event(b,21,2,tomato,0)
原始数据:Event(c,12,1,apple,0)
原始数据:Event(d,10,2,orange,0)
原始数据:Event(e,101,1,watermeleon,0)
原始数据:Event(a,3,1,ba,1)
原始数据:Event(b,11,2,to,0)
原始数据:Event(c,42,1,ap,0)
原始数据:Event(d,20,2,or,0)
原始数据:Event(e,111,2,wa,0)
ds2:Event(a,1,1,banana,1)
ds2:Event(b,21,2,tomato,0)
ds2:Event(c,12,1,apple,0)
ds2:Event(d,10,2,orange,0)
ds2:Event(e,101,1,watermeleon,0)
原始数据:Event(a,2,1,ba,0)
原始数据:Event(b,2,2,to,0)
原始数据:Event(c,88,1,ap,0)
原始数据:Event(d,44,2,or,0)
原始数据:Event(e,11,2,wa,0)
原始数据:Event(a,33,1,banana,0)
原始数据:Event(b,21,2,tomato,1)
原始数据:Event(c,55,2,apple,0)
原始数据:Event(d,66,1,orange,0)
原始数据:Event(e,101,1,watermeleon,0)
ds2:Event(a,2,1,ba,0)
ds2:Event(b,2,2,to,1)
Event(a,2,1,ba,0)
Event(b,21,2,tomato,0)
ds2:Event(c,88,1,ap,0)
Event(c,88,1,ap,0)
ds2:Event(d,44,2,or,0)
Event(d,44,2,or,0)
ds2:Event(e,11,2,wa,0)
Event(e,101,3,wa_watermeleon,0)

我们拿其中一个输出结果的数据简单分析一下:(上面最后一行)

Event(e,101,3,wa_watermeleon,0)

那么这个数据是如何输出的呢?

我们会发现ds1经过keyby以及counwindow后的max处理以后,留下了两条数据:


ds2:Event(e,101,1,watermeleon,0)
ds2:Event(e,11,2,wa,0)

紧接着,把数据流ds2转为keyedStream,然后又做了一次CountWindow操作,窗口大小是2,具体实现的代码我们下面分析,这里先把结果分析完:

因为上面对于key = e 下,满足了两条数据,也就是满足了countwindow的触发计算,这个时候会对这两个数据处理,根据我们第三步功能描述可知处理如下:

event_type = 1 event_type = 2

这两条数据的该字段不等,根据(3.1)可知,会创建一个新的Event对象,该对象的 event_type = 3, v = wa_watermeleon(两条数据的该字段的字符串拼接构成)

最终得到如下结果:

Event(e,101,3,wa_watermeleon,0)

最后,我们对ds2使用了本文的主要介绍的特性reinterpretAsKeyedStream进行分析,这个方法在DataStreamUtils中。

使用reinterpretAsKeyedStream的代码:

val aggregated = new DataStreamUtils(ds2).reinterpretAsKeyedStream((event) => event.key).countWindow(2).reduce((e1, e2) =>if(e1.event_type != e2.event_type)Event(e1.key,e1.time_,3,e2.v+"_"+e1.v,e1.partition)else if(e2.time_ > e1.time_) e2else e1).addSink(new SinkFunction[Event] {override def invoke(value: Event, context: SinkFunction.Context[_]): Unit =System.out.println(value.toString)}).name("result")

不用reinterpretAsKeyedStream的代码:

val aggregated = ds2.keyBy(_.key).countWindow(2).reduce((e1, e2) =>if(e1.event_type != e2.event_type)Event(e1.key,e1.time_,3,e2.v+"_"+e1.v,e1.partition)else if(e2.time_ > e1.time_) e2else e1).addSink(new SinkFunction[Event] {override def invoke(value: Event, context: SinkFunction.Context[_]): Unit =System.out.println(value.toString)}).name("result")

在这里就涉及到使用reinterpretAsKeyedStream的优势了,可能代码你无法更好的体会,我们通过StreamGraph来了解这两者的区别:


图片可能有点小,我们把关键地方放大查看:


在这里我们可以发现,同样是Window Operator,但是第一个Window Operator 的数据是通过上游HASH过来的,第二个是通过FORWARD方式过来

两个Operator之间的边展示的关键词,其实展示了两个算子之间数据是如何传输的,在之前的文章提到过关于partition的概念以及Flink已经提供的实现,此处阅读 。

2.重点源码分析

public static <T, K> KeyedStream<T, K> reinterpretAsKeyedStream(DataStream<T> stream,KeySelector<T, K> keySelector,TypeInformation<K> typeInfo) {PartitionTransformation<T> partitionTransformation = new PartitionTransformation<>(stream.getTransformation(),new ForwardPartitioner<>());return new KeyedStream<>(stream,partitionTransformation,keySelector,typeInfo);}

上面代码是 方法reinterpretAsKeyedStream的具体实现,最后我们可以看到return了一个KeyedStream流,创建这个流的时候首先创建了PartitionTransformation对象,其中使用了ForwardPartitioner分区器,那么FORWARD其实也是来源于此。

我们看一下keyby操作如何生产KeyedStream的:

public <K> KeyedStream<T, K> keyBy(KeySelector<T, K> key, TypeInformation<K> keyType) {Preconditions.checkNotNull(key);Preconditions.checkNotNull(keyType);return new KeyedStream<>(this, clean(key), keyType);}public KeyedStream(DataStream<T> dataStream, KeySelector<T, KEY> keySelector, TypeInformation<KEY> keyType) {this(dataStream,new PartitionTransformation<>(dataStream.getTransformation(),new KeyGroupStreamPartitioner<>(keySelector, StreamGraphGenerator.DEFAULT_LOWER_BOUND_MAX_PARALLELISM)),keySelector,keyType);}

上面代码我们可以看出,keyby同样构建keyedStream流,但是使用的分区器是KeyGroupStreamPartitioner。

M.概述

转载:Flink实验特性–reinterpretAsKeyedStream

【Flink】Flink实验特性--reinterpretAsKeyedStream 将DataStream重新解释为KeyedStream相关推荐

  1. Flink 1.9 特性学习和Blink SQL Parser 功能使用

    前言 本文对 Flink 1.9版本特性进行了解读(基于社区邮件组讨论),同时对Blink 开源版本 flink-sql-parser 模块进行学习了解,和大家一起交流分享. 1. Flink 1.9 ...

  2. 凌波微步Flink——Flink的技术逻辑与编程步骤剖析

    转载请注明出处:http://blog.csdn.net/dongdong9223/article/details/95459606 本文出自[我是干勾鱼的博客] Ingredients: Java: ...

  3. 大数据计算引擎之Flink Flink CEP复杂事件编程

    原文地址:大数据计算引擎之Flink Flink CEP复杂事件编程 复杂事件编程(CEP)是一种基于流处理的技术,将系统数据看作不同类型的事件,通过分析事件之间的关系,建立不同的时事件系序列库,并利 ...

  4. 凌波微步Flink——Flink API中的一些基础概念

    转载请注明出处:http://blog.csdn.net/dongdong9223/article/details/95355619 本文出自[我是干勾鱼的博客] Ingredients: Java: ...

  5. Chrome内核浏览器保存实验特性配置(://flags)的方法

    首先,通过浏览器手动修改实验特性配置 具体教程CopyDN一搜,有十万条重复教程,还可能会附带非常贴心的过时flag列表以供查询. 不同浏览器开放的实验特性也不同,即使是同内核版本的不同浏览器,也可能 ...

  6. 【期末满分作业】C语言程序设计 实训1——奖学金评定系统的设计与实现(附带实验报告、源码以及解释)

    大家好,各位努力奋斗的大学生小伙伴们!今天,我将带你们领略一项令人惊叹的程序设计奇迹--<奖学金评定系统>!是不是感到激动呢?别急,让我为你们揭开这个能让你在C语言程序设计中拿满分的秘密武 ...

  7. 【Flink】Flink Flink 1.14 新特性预览

    1.概述 转载:Flink 1.14 新特性预览 简介: 一文了解 Flink 1.14 版本新特性及最新进展 本文由社区志愿者陈政羽整理,内容源自阿里巴巴技术专家宋辛童 (五藏) 在 8 月 7 日 ...

  8. [Flink]Flink DataStream window join 和interval join

    目录 window join interval join window join 窗口连接把两个流中相同窗口通过一个键值连接起来.然后,两边的元素被传递到用户定义的JoinFunction或FlatJ ...

  9. [Flink]Flink常用的DataStream转换算子

    目录 3.1 Map 3.2 FlatMap 3.3 Filter 3.4 KeyBy 3.5 Reduce 3.6 Fold 3.7 Aggregations 3.8 Window 3.9 Wind ...

最新文章

  1. 创建性设计模式之2--建造者模式
  2. 51nod 2006 飞行员配对(二分图最大匹配) 裸匈牙利算法 求二分图最大匹配题
  3. 每天一道LeetCode-----找到第k个排列
  4. 设计模式 简单工厂模式
  5. 代做html网页多少钱,代做排名网站有吗,做排名帮你实现财富自由
  6. 远程桌面复制文件,由于网络或其他原因被意外中断,后来再连上远程桌面就无法复制了,而且复制文件的对话框也无法取消,可以试试下面的方法,实测有效:
  7. html的过渡属性,CSS3属性transition(过渡)多属性详解
  8. 什么叫企业级即时通讯软件
  9. Android开发笔记(一百七十一)使用Glide加载网络图片
  10. 10条建议让你创建更好的jQuery插件(转载)
  11. 重新启动postgre报错时,解决方案 ( 由备份文件占用空间太大造成 ) (linux 命令 df -h 查看磁盘空间)
  12. zwPython,字王集成式python开发平台,比pythonXY更强大、更方便。
  13. 数据挖掘工程师笔试及答案整理
  14. android仿微信红包动画,如何在Android中实现一个硬币转动微信红包动画效果
  15. idea Translation 使用有道翻译
  16. 什么是消息队列(Message queue)
  17. DNS地址,DNS服务器作用
  18. 网页前端设计一般思路
  19. Arduino 工控板开发
  20. 服务器怎么做无限耐久装备,饥荒物品无限耐久控制台指令 | 手游网游页游攻略大全...

热门文章

  1. 豆瓣再被约谈处罚150万!一年被罚20次,豆瓣到底怎么了?
  2. 新的“钉子户”来了!一加9RT将于10月13日正式亮相
  3. 苹果官宣:这届“春晚”,好早!
  4. 视频 | 为何我对小鹏NGP“半信半疑”
  5. Facebook入局视频会议,日活用户超3亿的Zoom股价应声下跌,Zoom为何不扛打?
  6. 拼多多联合三奇医卫等企业,每天上架1000万只平价口罩
  7. ​2020启示:拼多多篇— —退潮后,你才发现人家是游泳健将
  8. 库克笑嘻嘻!苹果明年或将迎来继iPhone 6后第二次换机大潮
  9. OPPO Reno 5G版高调宣布上市时间:已具备上市条件
  10. 3月19日发布!vivo X27配置揭晓:搭载骁龙710处理器