java 累加器_09-flink-Accumulator(累加器)
09-flink-Accumulator(累加器)
概念
Accumulator(累加器):累加器主要作用在用户操作(operate)中收集分布式统计信息或聚合信息。每个并行实例创建并更新其自己的累加器对象,不同并行实例的累加器由系统作业结束后合并。结果可以从作业执行的结果中获得,也可以从web运行时监视器中获得。
分类
IntCounter
LongCounter
DoubleCounter
Histogram
自定义(实现SimpleAccumulator接口)
用法
创建累加器:private IntCounter numLines = new IntCounter();
备注:在operate中使用
注册累加器:getRuntimeContext().addAccumulator("num-lines", this.numLines);
备注:operate 实现 Rich...接口
使用累加器:this.numLines.add(1);
获取结果:
JobExecutionResult myJobExecutionResult = env.execute("accumulators Test");
myJobExecutionResult.getAccumulatorResult("num-lines")
例子
public class CounterTest {
public static void main(String[] args) throws Exception {
//获取执行环境
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
//--hostname 10.24.14.193 --port 9000
final ParameterTool params = ParameterTool.fromArgs(args);
String hostname = params.has("hostname") ? params.get("hostname") : "localhost";
int port = params.has("port") ? params.getInt("port") : 9000;
System.out.println("hostName=" + hostname + " port=" + port);
//数据来源
DataStream text = env.socketTextStream(hostname, port, "\n");
//operate
text.map(new RichMapFunction() {
//第一步:定义累加器
private IntCounter numLines = new IntCounter();
@Override
public void open(Configuration parameters) throws Exception {
super.open(parameters);
//第二步:注册累加器
getRuntimeContext().addAccumulator("num-lines", this.numLines);
}
@Override
public String map(String s) throws Exception {
//第三步:累加
this.numLines.add(1);
return s;
}
});
//数据去向
text.print();
//执行
JobExecutionResult socketTest = env.execute("socketTest");
//第四步:结束后输出总量;如果不需要结束后持久化,可以省去,因为在flinkUI中可以看到
//String total = socketTest.getAccumulatorResult("num-lines").toString();
}
sparkUI 查看
flinkUI-Accumulators.png
java 累加器_09-flink-Accumulator(累加器)相关推荐
- Flink的累加器(Accumulator)应用
1. 累加器的简单介绍 累加器是从用户函数和操作中,分布式地统计或者聚合信息.每个并行实例创建并更新自己的Accumulator对象, 然后合并收集器的不同并行实例.在作业结束时由系统合并. 累加器的 ...
- Flink的累加器和广播变量、广播流、分布式缓存
1.Accumulator累加器 Accumulator即累加器,与Mapreduce counter的应用场景差不多,都能很好地观察task在运行期间的数据变化.可以在Flink job任务中的算 ...
- Spark 之 Accumulator 累加器
Spark Accumulator 累加器作用 源码 累加器原理图 Spark中累加器的执行流程: 累加器使用demo spark ui 使用累加器中可能遇到的坑 线程安全问题 累加器作用 累加器:分 ...
- 累加器 java_Spark笔记之累加器(Accumulator)
一.累加器简介 在Spark中如果想在Task计算的时候统计某些事件的数量,使用filter/reduce也可以,但是使用累加器是一种更方便的方式,累加器一个比较经典的应用场景是用来在Spark St ...
- Spark Accumulator累加器
什么是累加器 累加器用来对信息进行聚合 1 算子在计算时,不会影响到driver里的变量的值(driver里的变量称之为共享变量) 2 算子使用的其实都是driver里的变量的一个副本 3 如果想要影 ...
- Accumulator累加器(一)
累加器 应用场景:Driver端定义一个共享变量,将数据累加到该变量上,如果直接用foreach或map等迭代算子, 是无法将累加的变量返回到driver端,因为累加的过程发生在Executor端.一 ...
- java 累加器_Spark基础教程——累加器的使用(JAVA)
通常在向Spark传递函数时,比如使用map()函数或者用filter()传条件时,可以使用驱动器程序中定义的变量,但是集群中运行的每个任务都会得到这些变量的一份新的副本,更新这些副本的值也不会影响驱 ...
- PySpark 累加器使用及自定义累加器
累加器(accumulator) 功能 实现在Driver端和Executor端共享变量 写的功能 实现机制 Driver端定义的变量,在Executor端的每个Task都会得到这个变量的副本; 在每 ...
- 计算机累加器有加法器功能吗,累加器是什么_累加器的作用及原理介绍
累加器的概念 在中央处理器中,累加器(accumulator)是一种寄存器,用来储存计算产生的中间结果.如果没有像累加器这样的寄存器,那么在每次计算(加法,乘法,移位等等)后就必须要把结果写回到内存, ...
- [源码解析] 当 Java Stream 遇见 Flink
[源码解析] 当 Java Stream 遇见 Flink 文章目录 [源码解析] 当 Java Stream 遇见 Flink 0x00 摘要 0x01 领域 1.1 Flink 1.2 Java ...
最新文章
- 大话文本分类之Fnet
- golang 的AES加解密 (CBC/ECB/CFB 模式)
- HL7 Figure 2-1. Delimiter values(分隔符值)
- 【网址收藏】在win10中使用Minikube安装Kubernetes
- BZOJ1036 (其实这只是一份板子)
- servlet中弹出网页对话框
- 宝塔linux忘记密码,宝塔忘记登录入口了怎么解决 宝塔面板密码忘记了怎么办
- 安卓开发语言php,go语言支持安卓开发吗
- Matlab 图像采集工具的使用 - Image Acquisition Toolbox【IAT】 + 大恒相机的应用【1】+多个摄像头支持
- 10款主流的软件测试工具,你用过吗?
- C# 中的字符串内插
- 【JVM学习笔记一】JVM内存分布
- 通过消息跨进程发送与接收 TCopyDataStruct 数据 - 回复 skymesh
- 解决安卓4.4webview的兼容性问题
- linux gpu 地址空间,显卡内存和进程的虚拟地址空间
- 如何选择适合你的兴趣爱好(十五),油画
- 用anaconda启动Spyder报错
- 数学建模竞赛必须要掌握的十个算法
- 什么是JPA?SpringBoot 中使用JPA
- springboot中文官方文档