09-flink-Accumulator(累加器)

概念

Accumulator(累加器):累加器主要作用在用户操作(operate)中收集分布式统计信息或聚合信息。每个并行实例创建并更新其自己的累加器对象,不同并行实例的累加器由系统作业结束后合并。结果可以从作业执行的结果中获得,也可以从web运行时监视器中获得。

分类

IntCounter

LongCounter

DoubleCounter

Histogram

自定义(实现SimpleAccumulator接口)

用法

创建累加器:private IntCounter numLines = new IntCounter();

备注:在operate中使用

注册累加器:getRuntimeContext().addAccumulator("num-lines", this.numLines);

备注:operate 实现 Rich...接口

使用累加器:this.numLines.add(1);

获取结果:

JobExecutionResult myJobExecutionResult = env.execute("accumulators Test");

myJobExecutionResult.getAccumulatorResult("num-lines")

例子

public class CounterTest {

public static void main(String[] args) throws Exception {

//获取执行环境

final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

//--hostname 10.24.14.193 --port 9000

final ParameterTool params = ParameterTool.fromArgs(args);

String hostname = params.has("hostname") ? params.get("hostname") : "localhost";

int port = params.has("port") ? params.getInt("port") : 9000;

System.out.println("hostName=" + hostname + " port=" + port);

//数据来源

DataStream text = env.socketTextStream(hostname, port, "\n");

//operate

text.map(new RichMapFunction() {

//第一步:定义累加器

private IntCounter numLines = new IntCounter();

@Override

public void open(Configuration parameters) throws Exception {

super.open(parameters);

//第二步:注册累加器

getRuntimeContext().addAccumulator("num-lines", this.numLines);

}

@Override

public String map(String s) throws Exception {

//第三步:累加

this.numLines.add(1);

return s;

}

});

//数据去向

text.print();

//执行

JobExecutionResult socketTest = env.execute("socketTest");

//第四步:结束后输出总量;如果不需要结束后持久化,可以省去,因为在flinkUI中可以看到

//String total = socketTest.getAccumulatorResult("num-lines").toString();

}

sparkUI 查看

flinkUI-Accumulators.png

java 累加器_09-flink-Accumulator(累加器)相关推荐

  1. Flink的累加器(Accumulator)应用

    1. 累加器的简单介绍 累加器是从用户函数和操作中,分布式地统计或者聚合信息.每个并行实例创建并更新自己的Accumulator对象, 然后合并收集器的不同并行实例.在作业结束时由系统合并. 累加器的 ...

  2. Flink的累加器和广播变量、广播流、分布式缓存

    1.Accumulator累加器  Accumulator即累加器,与Mapreduce counter的应用场景差不多,都能很好地观察task在运行期间的数据变化.可以在Flink job任务中的算 ...

  3. Spark 之 Accumulator 累加器

    Spark Accumulator 累加器作用 源码 累加器原理图 Spark中累加器的执行流程: 累加器使用demo spark ui 使用累加器中可能遇到的坑 线程安全问题 累加器作用 累加器:分 ...

  4. 累加器 java_Spark笔记之累加器(Accumulator)

    一.累加器简介 在Spark中如果想在Task计算的时候统计某些事件的数量,使用filter/reduce也可以,但是使用累加器是一种更方便的方式,累加器一个比较经典的应用场景是用来在Spark St ...

  5. Spark Accumulator累加器

    什么是累加器 累加器用来对信息进行聚合 1 算子在计算时,不会影响到driver里的变量的值(driver里的变量称之为共享变量) 2 算子使用的其实都是driver里的变量的一个副本 3 如果想要影 ...

  6. Accumulator累加器(一)

    累加器 应用场景:Driver端定义一个共享变量,将数据累加到该变量上,如果直接用foreach或map等迭代算子, 是无法将累加的变量返回到driver端,因为累加的过程发生在Executor端.一 ...

  7. java 累加器_Spark基础教程——累加器的使用(JAVA)

    通常在向Spark传递函数时,比如使用map()函数或者用filter()传条件时,可以使用驱动器程序中定义的变量,但是集群中运行的每个任务都会得到这些变量的一份新的副本,更新这些副本的值也不会影响驱 ...

  8. PySpark 累加器使用及自定义累加器

    累加器(accumulator) 功能 实现在Driver端和Executor端共享变量 写的功能 实现机制 Driver端定义的变量,在Executor端的每个Task都会得到这个变量的副本; 在每 ...

  9. 计算机累加器有加法器功能吗,累加器是什么_累加器的作用及原理介绍

    累加器的概念 在中央处理器中,累加器(accumulator)是一种寄存器,用来储存计算产生的中间结果.如果没有像累加器这样的寄存器,那么在每次计算(加法,乘法,移位等等)后就必须要把结果写回到内存, ...

  10. [源码解析] 当 Java Stream 遇见 Flink

    [源码解析] 当 Java Stream 遇见 Flink 文章目录 [源码解析] 当 Java Stream 遇见 Flink 0x00 摘要 0x01 领域 1.1 Flink 1.2 Java ...

最新文章

  1. 大话文本分类之Fnet
  2. golang 的AES加解密 (CBC/ECB/CFB 模式)
  3. HL7 Figure 2-1. Delimiter values(分隔符值)
  4. 【网址收藏】在win10中使用Minikube安装Kubernetes
  5. BZOJ1036 (其实这只是一份板子)
  6. servlet中弹出网页对话框
  7. 宝塔linux忘记密码,宝塔忘记登录入口了怎么解决 宝塔面板密码忘记了怎么办
  8. 安卓开发语言php,go语言支持安卓开发吗
  9. Matlab 图像采集工具的使用 - Image Acquisition Toolbox【IAT】 + 大恒相机的应用【1】+多个摄像头支持
  10. 10款主流的软件测试工具,你用过吗?
  11. C# 中的字符串内插
  12. 【JVM学习笔记一】JVM内存分布
  13. 通过消息跨进程发送与接收 TCopyDataStruct 数据 - 回复 skymesh
  14. 解决安卓4.4webview的兼容性问题
  15. linux gpu 地址空间,显卡内存和进程的虚拟地址空间
  16. 如何选择适合你的兴趣爱好(十五),油画
  17. 用anaconda启动Spyder报错
  18. 数学建模竞赛必须要掌握的十个算法
  19. 什么是JPA?SpringBoot 中使用JPA
  20. springboot中文官方文档

热门文章

  1. 如何做好国产三维CAD软件的研发?
  2. bilibili缓存视频的位置
  3. 使用aria2为网盘下载加速
  4. CPU使用率过高如何排查问题
  5. 微信 8.0 的状态原来这么炸裂,无情地爱了爱了,做程序员的你还不赶紧设置一把?
  6. 如何高效进行出货复核作业,提升出货准确率?
  7. 关于新加坡IT薪酬和找工作网站
  8. 全球哪儿找工作最容易? 新加坡居榜首香港排第九
  9. Mysql为何不推荐写多表SQL
  10. 2019年的最新的最全的ava常见的面试题