1.异常情况

使用 SlidingEventTimeWindows 窗口,后面直接一个reduce算子,数据会出现倍增情况。

代码:

DataStreamSource<String> localhost = env.socketTextStream("localhost", 9991, "\n");final SingleOutputStreamOperator<DataEntity> dataSteam = localhost.flatMap(new FlatMapFunction<String, DataEntity>() {@Overridepublic void flatMap(String value, Collector<DataEntity> out) throws Exception {try {DataEntity dataEntity = new DataEntity();dataEntity.setCount(1);dataEntity.setKey("121212");dataEntity.setEventTime(System.currentTimeMillis());out.collect(dataEntity);} catch (Exception e) {int p = 0;}}});dataSteam.print("source-");SingleOutputStreamOperator<DataEntity> datawmStream = dataSteam.assignTimestampsAndWatermarks(new BoundedOutOfOrdernessTimestampExtractor<DataEntity>(Time.seconds(2)) {@Overridepublic long extractTimestamp(DataEntity element) {return element.getEventTime();}});datawmStream.keyBy(DataEntity::getKey).window(SlidingEventTimeWindows.of(Time.days(2), Time.days(1)))
//              .window(TumblingEventTimeWindows.of(Time.minutes(20))).trigger(CountTrigger.of(1)).reduce(new ReduceFunction<DataEntity>() {@Overridepublic DataEntity reduce(DataEntity value1, DataEntity value2) throws Exception {final Integer c1 = value1.getCount();final Integer c2 = value2.getCount();value1.setCount(c1 + c2);return value1;}}).print();

这个window地方,如果修改为TumblingEventTimeWindows,数据计算是正确的,每次都是加1,但是使用SlidingEventTimeWindows就会出现倍增情况,代码中的SlidingEventTimeWindows,其中size 是2天,slide是1天。那么SlidingEventTimeWindows中,每个数据到达窗口后,先有一个窗口分发器,将每个数据进行分配窗口,SlidingEventTimeWindows中一条数据会被分配到 size/slide 个窗口中,此时就会出现问题,当一个窗口当值计算完后,下个窗口进行计算时,element的值会变成上个窗口计算后的值。

源码分析:

WindowOperator.processElement(StreamRecord<IN> element);

窗口没有合并的需求,所以跳过多余的代码

@Override
public void processElement(StreamRecord<IN> element) throws Exception {// 窗口分发器,SlidingEventTimeWindows 会将一个element分给多个windowfinal Collection<W> elementWindows = windowAssigner.assignWindows(element.getValue(), element.getTimestamp(), windowAssignerContext);//if element is handled by none of assigned elementWindowsboolean isSkippedElement = true;final K key = this.<K>getKeyedStateBackend().getCurrentKey();if (windowAssigner instanceof MergingWindowAssigner) {.....} else {// 遍历每个窗口for (W window: elementWindows) {// drop if the window is already lateif (isWindowLate(window)) {continue;}isSkippedElement = false;// 设置窗口windowState.setCurrentNamespace(window);// 数据存储到state,reduce计算都是在这个里完成的windowState.add(element.getValue());triggerContext.key = key;triggerContext.window = window;TriggerResult triggerResult = triggerContext.onElement(element);if (triggerResult.isFire()) {ACC contents = windowState.get();if (contents == null) {continue;}// 触发计算时,将数据发射出去emitWindowContents(window, contents);}if (triggerResult.isPurge()) {windowState.clear();}// 定时器清理数据registerCleanupTimer(window);}}

Flink跟着问题读源码 - SlidingEventTimeWindows接reduce结果数据倍增相关推荐

  1. FATE学习:跟着日志读源码(九)upload任务job finsih阶段

    综述 同task,job结束时,并不会返回job finish 的信息,也是在DagScheduler.schedule_running_job() 轮询时,通过calculate_job_statu ...

  2. 微信读书vscode插件_跟我一起读源码 – 如何阅读开源代码

    阅读是最好的老师 在学习和提升编程技术的时候,通过阅读高质量的源码,来学习专家写的高质量的代码,是一种非常有效的提升自我的方式.程序员群体是一群乐于分享的群体,因此在互联网上有大量的高质量开源项目,阅 ...

  3. 我是怎么读源码的,授之以渔

    点击上方"视学算法",选择"设为星标" 做积极的人,而不是积极废人 作者 :youzhibing 链接 :https://www.cnblogs.com/you ...

  4. 这样读源码,不牛X也难

    程序员在工作过程中,会遇到很多需要阅读源码的场景,比如技术预研.选择技术框架.接手以前的项目.review他人的代码.维护老产品等等.可以说,阅读源代码是程序员的基本功,这项基本功是否扎实,会在很大程 ...

  5. myisam怎么读_耗时半年,我成功“逆袭”,拿下美团offer(刷面试题+读源码+项目准备)...

    欢迎关注专栏[以架构赢天下]--每天持续分享Java相关知识点 以架构赢天下​zhuanlan.zhihu.com 以架构赢天下--持续分享Java相关知识点 每篇文章首发此专栏 欢迎各路Java程序 ...

  6. 读源码,对程序员重要吗?

    来源: CSDN(ID:CSDNnews) 嘿,朋友们!本文我将分享一些关于主动阅读和研究源码的一些想法.在我看来,阅读源码能够帮你成为一名更专业的开发人员.毫无疑问的是,阅读源码提高了我的软件开发水 ...

  7. 夜读源码,带你探究 Go 语言的iota

    Go 语言的 iota 怎么说呢,感觉像枚举,又有点不像枚举,它的底层是什么样的,用哪个姿势使用才算正规,今天转载一篇「Go夜读」社区上分享的文章,咱们一起学习下.Go 夜读,带你每页读源码~!  这 ...

  8. 【flink】Flink 1.12.2 源码浅析 : Task数据输入

    1.概述 转载:Flink 1.12.2 源码浅析 : Task数据输入 在 Task 中,InputGate 是对输入的封装,InputGate 是和 JobGraph 中 JobEdge 一一对应 ...

  9. 【flink】Flink 1.12.2 源码浅析 :Task数据输出

    1.概述 转载:Flink 1.12.2 源码浅析 :Task数据输出 Stream的计算模型采用的是PUSH模式, 上游主动向下游推送数据, 上下游之间采用生产者-消费者模式, 下游收到数据触发计算 ...

最新文章

  1. net start mysql 提示:服务名无效 请键入NET HELPING 2185以获得更多的帮助的问题
  2. oracle minus 与sqlserver except
  3. 【C#】【APK】APK文件解析AXML-层层深入APK文件解析之一
  4. mysql命令行执行时不输出列名(字段名)
  5. 浮点数用大小端存储吗_干细胞存储有什么用,干细胞有必要存储吗
  6. java并发编程-volatile内存实现和原理
  7. c语言 判断一个图是否全连通_【连载】(判断执行语句)乐创DIY C语言讲义——3.8节(2)...
  8. Gulp解决发布线上文件(CSS和JS)缓存问题
  9. 推理框架runtime的动态输入、输出思路
  10. win10 python ffmpeg推流到b站
  11. 正定矩阵(Positive Definite Matrices)、半正定矩阵(Positive Semidefinite Matrices)
  12. 天津高一计算机会考,天津市高中信息技术会考不通过可以参加高考吗
  13. excel 中vb组合框_Excel 2013中的工作表组合框问题
  14. Find 7 Faster Than John Von Neumann
  15. 2021年全球支付现状及发展趋势分析:亚太电子商务数字支付将超过3.1万亿美元[图]
  16. Oracle 11g 的下载与安装
  17. 2014年计算机专业硕士研究生好就业吗?一位计算机专业硕士毕业生的求职经历和感想
  18. java 写一个简易闹钟
  19. 英特尔one API——AI为科技加速
  20. CMDB——概念详解

热门文章

  1. 国产监控摄像头占领英国市场 专家怕言行被掌控
  2. 使用HTML 5/CSS3五步快速制作便签贴特效
  3. Cobalt Strike使用教程一
  4. 更新了一下自己的POJO代码生成器
  5. 揭秘OPhone白手起家前后:一个系统的诞生
  6. ##04- Optional
  7. 基于OHCI的USB主机 —— UFI读状态代码
  8. 基于Jquery实现海底掘金版打豆豆
  9. Mac和网页版Skype更新聊天机器人功能
  10. A“一个部族,一个民族,一个弗雷尔卓德。”(素数筛,逆序对,树状数组)...