累加器 java_Spark笔记之累加器(Accumulator)
一、累加器简介
在Spark中如果想在Task计算的时候统计某些事件的数量,使用filter/reduce也可以,但是使用累加器是一种更方便的方式,累加器一个比较经典的应用场景是用来在Spark Streaming应用中记录某些事件的数量。
使用累加器时需要注意只有Driver能够取到累加器的值,Task端进行的是累加操作。
创建的Accumulator变量的值能够在Spark Web UI上看到,在创建时应该尽量为其命名,下面探讨如何在Spark Web UI上查看累加器的值。
示例代码:package cc11001100.spark.sharedVariables.accumulators;
import org.apache.spark.api.java.function.ForeachFunction;
import org.apache.spark.sql.Encoders;
import org.apache.spark.sql.SparkSession;
import org.apache.spark.util.LongAccumulator;
import java.util.Collections;
import java.util.concurrent.TimeUnit;
/**
* @author CC11001100
*/
public class SparkWebUIShowAccumulatorDemo {
public static void main(String[] args) {
SparkSession spark = SparkSession.builder().master("local[*]").getOrCreate();
LongAccumulator fooCount = spark.sparkContext().longAccumulator("fooCount");
spark.createDataset(Collections.singletonList(1024), Encoders.INT())
.foreach((ForeachFunction) fooCount::add);
try {
TimeUnit.DAYS.sleep(365 * 10000);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
启动的时候注意观察控制台上输出的Spark Web UI的地址:
打开此链接,点进去Jobs-->Stage,可以看到fooCount累加器的值已经被累加到了1024:
二、Accumulator的简单使用
Spark内置了三种类型的Accumulator,分别是LongAccumulator用来累加整数型,DoubleAccumulator用来累加浮点型,CollectionAccumulator用来累加集合元素。package cc11001100.spark.sharedVariables.accumulators;
import org.apache.spark.SparkContext;
import org.apache.spark.api.java.function.MapFunction;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Encoders;
import org.apache.spark.sql.SparkSession;
import org.apache.spark.util.CollectionAccumulator;
import org.apache.spark.util.DoubleAccumulator;
import org.apache.spark.util.LongAccumulator;
import java.util.Arrays;
/**
* 累加器的基本使用
*
* @author CC11001100
*/
public class AccumulatorsSimpleUseDemo {
public static void main(String[] args) {
SparkSession spark = SparkSession.builder().master("local[*]").getOrCreate();
SparkContext sc = spark.sparkContext();
// 内置的累加器有三种,LongAccumulator、DoubleAccumulator、CollectionAccumulator
// LongAccumulator: 数值型累加
LongAccumulator longAccumulator = sc.longAccumulator("long-account");
// DoubleAccumulator: 小数型累加
DoubleAccumulator doubleAccumulator = sc.doubleAccumulator("double-account");
// CollectionAccumulator:集合累加
CollectionAccumulator collectionAccumulator = sc.collectionAccumulator("double-account");
Dataset num1 = spark.createDataset(Arrays.asList(1, 2, 3), Encoders.INT());
Dataset num2 = num1.map((MapFunction) x -> {
longAccumulator.add(x);
doubleAccumulator.add(x);
collectionAccumulator.add(x);
return x;
}, Encoders.INT()).cache();
num2.count();
System.out.println("longAccumulator: " + longAccumulator.value());
System.out.println("doubleAccumulator: " + doubleAccumulator.value());
// 注意,集合中元素的顺序是无法保证的,多运行几次发现每次元素的顺序都可能会变化
System.out.println("collectionAccumulator: " + collectionAccumulator.value());
}
}
三、自定义Accumulator
当内置的Accumulator无法满足要求时,可以继承AccumulatorV2实现自定义的累加器。
实现自定义累加器的步骤:
1. 继承AccumulatorV2,实现相关方法
2. 创建自定义Accumulator的实例,然后在SparkContext上注册它
假设要累加的数非常大,内置的LongAccumulator已经无法满足需求,下面是一个简单的例子用来累加BigInteger:package cc11001100.spark.sharedVariables.accumulators;
import org.apache.spark.SparkContext;
import org.apache.spark.api.java.function.MapFunction;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Encoders;
import org.apache.spark.sql.SparkSession;
import org.apache.spark.util.AccumulatorV2;
import java.math.BigInteger;
import java.util.Arrays;
import java.util.List;
/**
* 自定义累加器
*
* @author CC11001100
*/
public class CustomAccumulatorDemo {
// 需要注意的是累加操作不能依赖顺序,比如类似于StringAccumulator这种则会得到错误的结果
public static class BigIntegerAccumulator extends AccumulatorV2 {
private BigInteger num = BigInteger.ZERO;
public BigIntegerAccumulator() {
}
public BigIntegerAccumulator(BigInteger num) {
this.num = new BigInteger(num.toString());
}
@Override
public boolean isZero() {
return num.compareTo(BigInteger.ZERO) == 0;
}
@Override
public AccumulatorV2 copy() {
return new BigIntegerAccumulator(num);
}
@Override
public void reset() {
num = BigInteger.ZERO;
}
@Override
public void add(BigInteger num) {
this.num = this.num.add(num);
}
@Override
public void merge(AccumulatorV2 other) {
num = num.add(other.value());
}
@Override
public BigInteger value() {
return num;
}
}
public static void main(String[] args) {
SparkSession spark = SparkSession.builder().master("local[*]").getOrCreate();
SparkContext sc = spark.sparkContext();
// 直接new自定义的累加器
BigIntegerAccumulator bigIntegerAccumulator = new BigIntegerAccumulator();
// 然后在SparkContext上注册一下
sc.register(bigIntegerAccumulator, "bigIntegerAccumulator");
List numList = Arrays.asList(new BigInteger("9999999999999999999999"), new BigInteger("9999999999999999999999"), new BigInteger("9999999999999999999999"));
Dataset num = spark.createDataset(numList, Encoders.kryo(BigInteger.class));
Dataset num2 = num.map((MapFunction) x -> {
bigIntegerAccumulator.add(x);
return x;
}, Encoders.kryo(BigInteger.class));
num2.count();
System.out.println("bigIntegerAccumulator: " + bigIntegerAccumulator.value());
}
}
思考:内置的累加器LongAccumulator、DoubleAccumulator、CollectionAccumulator和我上面的自定义BigIntegerAccumulator,它们都有一个共同的特点,就是最终的结果不受累加数据顺序的影响(对于CollectionAccumulator来说,可以简单的将结果集看做是一个无序Set),看到网上有博主举例子StringAccumulator,这个就是一个错误的例子,就相当于开了一百个线程,每个线程随机sleep若干毫秒然后往StringBuffer中追加字符,最后追加出来的字符串是无法被预测的。总结一下就是累加器的最终结果应该不受累加顺序的影响,否则就要重新审视一下这个累加器的设计是否合理。
四、使用Accumulator的陷阱
来讨论一下使用累加器的一些陷阱,累加器的累加是在Task中进行的,而这些Task就是我们在Dataset上调用的一些算子操作,这些算子操作有Transform的,也有Action的,来探讨一下不同类型的算子对Accumulator有什么影响。package cc11001100.spark.sharedVariables.accumulators;
import org.apache.spark.SparkContext;
import org.apache.spark.api.java.function.MapFunction;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Encoders;
import org.apache.spark.sql.SparkSession;
import org.apache.spark.util.LongAccumulator;
import java.util.Arrays;
/**
* 累加器使用的陷阱
*
* @author CC11001100
*/
public class AccumulatorTrapDemo {
public static void main(String[] args) {
SparkSession spark = SparkSession.builder().master("local[*]").getOrCreate();
SparkContext sc = spark.sparkContext();
LongAccumulator longAccumulator = sc.longAccumulator("long-account");
// ------------------------------- 在transform算子中的错误使用 -------------------------------------------
Dataset num1 = spark.createDataset(Arrays.asList(1, 2, 3), Encoders.INT());
Dataset nums2 = num1.map((MapFunction) x -> {
longAccumulator.add(1);
return x;
}, Encoders.INT());
// 因为没有Action操作,nums.map并没有被执行,因此此时广播变量的值还是0
System.out.println("num2 1: " + longAccumulator.value()); // 0
// 调用一次action操作,num.map得到执行,广播变量被改变
nums2.count();
System.out.println("num2 2: " + longAccumulator.value()); // 3
// 又调用了一次Action操作,广播变量所在的map又被执行了一次,所以累加器又被累加了一遍,就悲剧了
nums2.count();
System.out.println("num2 3: " + longAccumulator.value()); // 6
// ------------------------------- 在transform算子中的正确使用 -------------------------------------------
// 累加器不应该被重复使用,或者在合适的时候进行cache断开与之前Dataset的血缘关系,因为cache了就不必重复计算了
longAccumulator.setValue(0);
Dataset nums3 = num1.map((MapFunction) x -> {
longAccumulator.add(1);
return x;
}, Encoders.INT()).cache(); // 注意这个地方进行了cache
// 因为没有Action操作,nums.map并没有被执行,因此此时广播变量的值还是0
System.out.println("num3 1: " + longAccumulator.value()); // 0
// 调用一次action操作,广播变量被改变
nums3.count();
System.out.println("num3 2: " + longAccumulator.value()); // 3
// 又调用了一次Action操作,因为前一次调用count时num3已经被cache,num2.map不会被再执行一遍,所以这里的值还是3
nums3.count();
System.out.println("num3 3: " + longAccumulator.value()); // 3
// ------------------------------- 在action算子中的使用 -------------------------------------------
longAccumulator.setValue(0);
num1.foreach(x -> {
longAccumulator.add(1);
});
// 因为是Action操作,会被立即执行所以打印的结果是符合预期的
System.out.println("num4: " + longAccumulator.value()); // 3
}
}
五、Accumulator使用的奇淫技巧
累加器并不是只能用来实现加法,也可以用来实现减法,直接把要累加的数值改成负数就可以了:package cc11001100.spark.sharedVariables.accumulators;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Encoders;
import org.apache.spark.sql.SparkSession;
import org.apache.spark.util.LongAccumulator;
import java.util.Arrays;
/**
* 使用累加器实现减法
*
* @author CC11001100
*/
public class AccumulatorSubtraction {
public static void main(String[] args) {
SparkSession spark = SparkSession.builder().master("local[*]").getOrCreate();
Dataset nums = spark.createDataset(Arrays.asList(1, 2, 3, 4, 5, 6, 7, 8), Encoders.INT());
LongAccumulator longAccumulator = spark.sparkContext().longAccumulator("AccumulatorSubtraction");
nums.foreach(x -> {
if (x % 3 == 0) {
longAccumulator.add(-2);
} else {
longAccumulator.add(1);
}
});
System.out.println("longAccumulator: " + longAccumulator.value()); // 2
}
}
相关资料:
.
累加器 java_Spark笔记之累加器(Accumulator)相关推荐
- PySpark 累加器使用及自定义累加器
累加器(accumulator) 功能 实现在Driver端和Executor端共享变量 写的功能 实现机制 Driver端定义的变量,在Executor端的每个Task都会得到这个变量的副本; 在每 ...
- 计算机累加器有加法器功能吗,累加器是什么_累加器的作用及原理介绍
累加器的概念 在中央处理器中,累加器(accumulator)是一种寄存器,用来储存计算产生的中间结果.如果没有像累加器这样的寄存器,那么在每次计算(加法,乘法,移位等等)后就必须要把结果写回到内存, ...
- java 累加器_Spark基础教程——累加器的使用(JAVA)
通常在向Spark传递函数时,比如使用map()函数或者用filter()传条件时,可以使用驱动器程序中定义的变量,但是集群中运行的每个任务都会得到这些变量的一份新的副本,更新这些副本的值也不会影响驱 ...
- 密码学之累加器【笔记】
累加器 累加器可将多个值累加到一个值,可用于隐藏每个被累加的值,并对被累加的每个值做认证.累加器可以用函数hn:Xn∗Yn→Xnh_n:X_n*Y_n\to X_nhn:Xn∗Yn→Xn定义为 ...
- FPGA学习笔记25--乘累加器
module MAC(out,opa,opb,clk,clr); output[15:0] out; input[7:0] opa,opb; input clk,clr; wire[15:0] sum ...
- python 累加器_python中的累加器
你想问的第一件事是当你print tries时,你得到了什么? 实际上,你会发现你总是得到1.在 因为,你没有增加.你没有添加,因为用户一直在猜测.在 通常,您可以为每个猜测编写tries = tri ...
- 嵌入式开发学习笔记5-了解单片机中的特殊功能寄存器(寄存器B、累加器A和程序状态字PSW)
嵌入式开发学习笔记5-了解单片机中的特殊功能寄存器(寄存器.累加器和程序状态字) 累加器A 寄存器B 程序状态字PSW 累加器A 累加器A是ACC(Accumulator)的缩写,累加器A是一个具有特 ...
- Flink的累加器(Accumulator)应用
1. 累加器的简单介绍 累加器是从用户函数和操作中,分布式地统计或者聚合信息.每个并行实例创建并更新自己的Accumulator对象, 然后合并收集器的不同并行实例.在作业结束时由系统合并. 累加器的 ...
- Spark Accumulator累加器
什么是累加器 累加器用来对信息进行聚合 1 算子在计算时,不会影响到driver里的变量的值(driver里的变量称之为共享变量) 2 算子使用的其实都是driver里的变量的一个副本 3 如果想要影 ...
最新文章
- 越南黑客组织APT32正对NSA网络武器开展研究利用
- 从工作的角度,NLP/CV/推荐系统选哪个?
- 基础-快速排序(基本)
- Scala入门到精通——第二十三节 高级类型 (二)
- [Network Architecture]DPN(Dual Path Network)算法详解(转)
- 谷歌浏览器地址栏记录怎么删除 Chrome浏览器地址栏记录清除方法
- 转-开源云API竞争大幕开启
- python函数多次调用内存溢出_关于循环多次(具体160次左右)获取同一个句柄对象,不同实例,内存溢出的问题。...
- Atitit.数据库表的物理存储结构原理与架构设计与实践
- python 24点题目分析_24点游戏的递归解法和Python实现
- C-Free5.0破解码
- Hybrid App基础知识
- vivado配置EMIO(使用vivado配置XDC文件)
- 如何使用CorelDRAW 2019快速制作幻影图像效果
- mysql配置文件生效测试
- 《西瓜书》笔记整理——第一章
- String Matching 字符串匹配算法——干货从头放到尾
- 校园点餐系统:点餐、食堂管理、商户管理和菜品管理(Java和MySQL)
- Android与H5(JS)互调
- okhttp3.RequestBody.create(Ljava/lang/String;Lokhttp3/MediaType;)Lokhttp3/RequestBody
热门文章
- 欧拉角中各种角度的名称
- 解读手机拍照的各个参数(108M)
- 【附源码】计算机毕业设计SSM实验室预约管理系统
- 侍魂无限跳服务器,经典游戏《真侍魂》橘右京的无限残像,对手就这么活活的被挡死了...
- 下载国外软件慢(例如:Python安装包)
- PDF格式转换、合并、分割、加水印、加页码各种操作,收藏这几个网站就够啦
- win10/11下wsl2安装gpu版的pytorch(避坑指南)
- EMU8086开发工具_微机原理
- 北京房价当日报20141013
- js 判断是否为 Android IOS IPAD IPHONE 等移动设备访问