文章目录

  • Accumulator源码分析学习
    • Accumulator源码结构
      • Accumulator 结构
        • 成员方法
      • SimpleAccumulator
      • 累加器使用案例
        • code

Accumulator源码分析学习

Accumulator源码结构

Flink 的Accumulator是一个累加器,和spark的累加器是一个道理

Accumulator 结构

public interface Accumulator<V, R extends Serializable> extends Serializable, Cloneable {
}

成员方法

  • add
  • getLocalValue
  • resetLocal
  • merge
  • clone

SimpleAccumulator

源码中SimpleAccumulator接口继承了累加器,供上层的其他累加器实现这个接口,然后实现具体逻辑

@Public
public interface SimpleAccumulator<T extends Serializable> extends Accumulator<T,T> {}
  1. IntCounter,LongCounter,DoubleCounter:允许将 TaskManager 发送的 int,long,double 值汇总在一起
  2. LongMaximum,LongMinimum,IntMaximum,IntMinimum,DoubleMaximum,DoubleMinimum:累加器,用于确定不同类型的最大值和最小值

源码中可以看出有很多具体的不同功能的实现类都实现了这个接口

累加器使用案例

code

lines.flatMap(new RichFlatMapFunction<String, Tuple2<String, Integer>>() {//创建一个累加器private IntCounter linesNum = new IntCounter();@Overridepublic void open(Configuration parameters) throws Exception {//注册一个累加器getRuntimeContext().addAccumulator("linesNum", linesNum);}@Overridepublic void flatMap(String line, Collector<Tuple2<String, Integer>> out) throws Exception {String[] words = line.split("\\W+");for (String word : words) {out.collect(new Tuple2<>(word, 1));}// 处理每一行数据后 linesNum 递增linesNum.add(1);}
})
.groupBy(0)
.sum(1)
.print();//获取累加器结果
int linesNum = env.getLastJobExecutionResult().getAccumulatorResult("linesNum");
System.out.println(linesNum);

【FLink源码分析】:Accumulator源码分析相关推荐

  1. 老李推荐:第5章5节《MonkeyRunner源码剖析》Monkey原理分析-启动运行: 获取系统服务引用 1...

    老李推荐:第5章5节<MonkeyRunner源码剖析>Monkey原理分析-启动运行: 获取系统服务引用 上一节我们描述了monkey的命令处理入口函数run是如何调用optionPro ...

  2. 手机自动化测试:Appium源码分析之跟踪代码分析四 1

    手机自动化测试:Appium源码分析之跟踪代码分析四 控制器模块 // Appium webserver controller methods // https://github.com/hugs/a ...

  3. 【原创】【专栏】《Linux设备驱动程序》--- LDD3源码目录结构和源码分析经典链接

    http://blog.csdn.net/geng823/article/details/37567557 [原创][专栏]<Linux设备驱动程序>--- LDD3源码目录结构和源码分析 ...

  4. 【Linux 内核】进程管理 ( 进程相关系统调用源码分析 | fork() 源码 | vfork() 源码 | clone() 源码 | _do_fork() 源码 | do_fork() 源码 )

    文章目录 一.fork 系统调用源码 二.vfork 系统调用源码 三.clone 系统调用源码 四._do_fork 函数源码 五.do_fork 函数源码 Linux 进程相关 " 系统 ...

  5. 【Android 热修复】热修复原理 ( 类加载分析 | 分析 PathClassLoader 源码 | 分析 BaseDexClassLoader 源码 | 分析 PathDexList 源码 )

    文章目录 一.分析 PathClassLoader 源码 二.分析 BaseDexClassLoader 源码 三.分析 PathDexList 源码 四. 源码资源 一.分析 PathClassLo ...

  6. 【Android 电量优化】JobScheduler 源码分析 ( JobServiceContext 源码分析 | 闭环操作总结 | 用户提交任务 | 广播接收者接受相关广播触发任务执行 )★

    文章目录 一.JobServiceContext 引入 二.JobServiceContext 源码分析 三.用户在应用层如何使用 JobScheduler 四.用户提交任务 五.广播接收者监听广播触 ...

  7. 老李推荐:第6章1节《MonkeyRunner源码剖析》Monkey原理分析-事件源-事件源概览 1...

    老李推荐:第6章1节<MonkeyRunner源码剖析>Monkey原理分析-事件源-事件源概览 在上一章中我们有简要的介绍了事件源是怎么一回事,但是并没有进行详细的描述.那么往下的这几个 ...

  8. kafka源码分析之一server启动分析

    0. 关键概念 关键概念 Concepts Function Topic 用于划分Message的逻辑概念,一个Topic可以分布在多个Broker上. Partition 是Kafka中横向扩展和一 ...

  9. Android源码分析--MediaServer源码分析(二)

    在上一篇博客中Android源码分析–MediaServer源码分析(一),我们知道了ProcessState和defaultServiceManager,在分析源码的过程中,我们被Android的B ...

最新文章

  1. python基础——logging、OS、sys、random、string模块(0424)
  2. HDU - 4280 Island Transport(最大流)
  3. 2018届应届生android面试总结
  4. php+正则匹配qq,正则表达式对qq号进行验证的实例
  5. Java 10新特性解密
  6. Long Way To Go 之 Python 5 (2)
  7. leetcode题解62-不同路径
  8. centos安装配置tftp服务器
  9. 淘宝、飞猪、闲鱼都挂了,阿里云却正常?!
  10. SQL Server 复制:事务发布
  11. Linux操作系统基础原理
  12. 记:判断三个点是否在一条直线上
  13. 5WHY分析法 学习笔记
  14. 20200727《Flask进阶》——模块化程序(蓝本)
  15. Python爬虫,30秒爬取500+篇微信文章!太强啦!
  16. 【云原生】设备云之FlexManager边缘计算[Javascript]
  17. 宗镜录略讲——南怀瑾老师——系列11
  18. 说Python已经饱和的人,你一定不懂这个技能
  19. 哪位高手能帮小弟找个没有病毒能用的穿越火线自动准备刷分挂呀?
  20. Origin如何使用基础功能?

热门文章

  1. 方舟原始恐惧mod生物代码_《方舟:恐惧进化4》登场,各种幽灵生物你成功驯服了吗?...
  2. C语言读取文件函数:fgets(); fgetc()
  3. 简单介绍Vue使用echarts定制特殊的仪表盘
  4. 关键词分析-对同行网站进行全面的分析-免费关键词分析工具
  5. 边沿检测与提取,轮廓跟踪与Hough变换(转)
  6. python执行cmd命令,并获得返回值
  7. php 微擎钻石投票二开,[模块插件]微擎钻石投票男神女神公众号投票系统完美运营版其他-(微信)小程序...
  8. app审核被拒:App Tracking Transparency permission request when reviewed on iOS 15.0
  9. 【组队学习】Task06:FOR、IF、以及while
  10. 使用Linux Tc实现入向和出向限速