一、累加器简介

在Spark中如果想在Task计算的时候统计某些事件的数量,使用filter/reduce也可以,但是使用累加器是一种更方便的方式,累加器一个比较经典的应用场景是用来在Spark Streaming应用中记录某些事件的数量。

使用累加器时需要注意只有Driver能够取到累加器的值,Task端进行的是累加操作。

创建的Accumulator变量的值能够在Spark Web UI上看到,在创建时应该尽量为其命名,下面探讨如何在Spark Web UI上查看累加器的值。

示例代码:package cc11001100.spark.sharedVariables.accumulators;

import org.apache.spark.api.java.function.ForeachFunction;

import org.apache.spark.sql.Encoders;

import org.apache.spark.sql.SparkSession;

import org.apache.spark.util.LongAccumulator;

import java.util.Collections;

import java.util.concurrent.TimeUnit;

/**

* @author CC11001100

*/

public class SparkWebUIShowAccumulatorDemo {

public static void main(String[] args) {

SparkSession spark = SparkSession.builder().master("local[*]").getOrCreate();

LongAccumulator fooCount = spark.sparkContext().longAccumulator("fooCount");

spark.createDataset(Collections.singletonList(1024), Encoders.INT())

.foreach((ForeachFunction) fooCount::add);

try {

TimeUnit.DAYS.sleep(365 * 10000);

} catch (InterruptedException e) {

e.printStackTrace();

}

}

}

启动的时候注意观察控制台上输出的Spark Web UI的地址:

打开此链接,点进去Jobs-->Stage,可以看到fooCount累加器的值已经被累加到了1024:

二、Accumulator的简单使用

Spark内置了三种类型的Accumulator,分别是LongAccumulator用来累加整数型,DoubleAccumulator用来累加浮点型,CollectionAccumulator用来累加集合元素。package cc11001100.spark.sharedVariables.accumulators;

import org.apache.spark.SparkContext;

import org.apache.spark.api.java.function.MapFunction;

import org.apache.spark.sql.Dataset;

import org.apache.spark.sql.Encoders;

import org.apache.spark.sql.SparkSession;

import org.apache.spark.util.CollectionAccumulator;

import org.apache.spark.util.DoubleAccumulator;

import org.apache.spark.util.LongAccumulator;

import java.util.Arrays;

/**

* 累加器的基本使用

*

* @author CC11001100

*/

public class AccumulatorsSimpleUseDemo {

public static void main(String[] args) {

SparkSession spark = SparkSession.builder().master("local[*]").getOrCreate();

SparkContext sc = spark.sparkContext();

// 内置的累加器有三种,LongAccumulator、DoubleAccumulator、CollectionAccumulator

// LongAccumulator: 数值型累加

LongAccumulator longAccumulator = sc.longAccumulator("long-account");

// DoubleAccumulator: 小数型累加

DoubleAccumulator doubleAccumulator = sc.doubleAccumulator("double-account");

// CollectionAccumulator:集合累加

CollectionAccumulator collectionAccumulator = sc.collectionAccumulator("double-account");

Dataset num1 = spark.createDataset(Arrays.asList(1, 2, 3), Encoders.INT());

Dataset num2 = num1.map((MapFunction) x -> {

longAccumulator.add(x);

doubleAccumulator.add(x);

collectionAccumulator.add(x);

return x;

}, Encoders.INT()).cache();

num2.count();

System.out.println("longAccumulator: " + longAccumulator.value());

System.out.println("doubleAccumulator: " + doubleAccumulator.value());

// 注意,集合中元素的顺序是无法保证的,多运行几次发现每次元素的顺序都可能会变化

System.out.println("collectionAccumulator: " + collectionAccumulator.value());

}

}

三、自定义Accumulator

当内置的Accumulator无法满足要求时,可以继承AccumulatorV2实现自定义的累加器。

实现自定义累加器的步骤:

1. 继承AccumulatorV2,实现相关方法

2. 创建自定义Accumulator的实例,然后在SparkContext上注册它

假设要累加的数非常大,内置的LongAccumulator已经无法满足需求,下面是一个简单的例子用来累加BigInteger:package cc11001100.spark.sharedVariables.accumulators;

import org.apache.spark.SparkContext;

import org.apache.spark.api.java.function.MapFunction;

import org.apache.spark.sql.Dataset;

import org.apache.spark.sql.Encoders;

import org.apache.spark.sql.SparkSession;

import org.apache.spark.util.AccumulatorV2;

import java.math.BigInteger;

import java.util.Arrays;

import java.util.List;

/**

* 自定义累加器

*

* @author CC11001100

*/

public class CustomAccumulatorDemo {

// 需要注意的是累加操作不能依赖顺序,比如类似于StringAccumulator这种则会得到错误的结果

public static class BigIntegerAccumulator extends AccumulatorV2 {

private BigInteger num = BigInteger.ZERO;

public BigIntegerAccumulator() {

}

public BigIntegerAccumulator(BigInteger num) {

this.num = new BigInteger(num.toString());

}

@Override

public boolean isZero() {

return num.compareTo(BigInteger.ZERO) == 0;

}

@Override

public AccumulatorV2 copy() {

return new BigIntegerAccumulator(num);

}

@Override

public void reset() {

num = BigInteger.ZERO;

}

@Override

public void add(BigInteger num) {

this.num = this.num.add(num);

}

@Override

public void merge(AccumulatorV2 other) {

num = num.add(other.value());

}

@Override

public BigInteger value() {

return num;

}

}

public static void main(String[] args) {

SparkSession spark = SparkSession.builder().master("local[*]").getOrCreate();

SparkContext sc = spark.sparkContext();

// 直接new自定义的累加器

BigIntegerAccumulator bigIntegerAccumulator = new BigIntegerAccumulator();

// 然后在SparkContext上注册一下

sc.register(bigIntegerAccumulator, "bigIntegerAccumulator");

List numList = Arrays.asList(new BigInteger("9999999999999999999999"), new BigInteger("9999999999999999999999"), new BigInteger("9999999999999999999999"));

Dataset num = spark.createDataset(numList, Encoders.kryo(BigInteger.class));

Dataset num2 = num.map((MapFunction) x -> {

bigIntegerAccumulator.add(x);

return x;

}, Encoders.kryo(BigInteger.class));

num2.count();

System.out.println("bigIntegerAccumulator: " + bigIntegerAccumulator.value());

}

}

思考:内置的累加器LongAccumulator、DoubleAccumulator、CollectionAccumulator和我上面的自定义BigIntegerAccumulator,它们都有一个共同的特点,就是最终的结果不受累加数据顺序的影响(对于CollectionAccumulator来说,可以简单的将结果集看做是一个无序Set),看到网上有博主举例子StringAccumulator,这个就是一个错误的例子,就相当于开了一百个线程,每个线程随机sleep若干毫秒然后往StringBuffer中追加字符,最后追加出来的字符串是无法被预测的。总结一下就是累加器的最终结果应该不受累加顺序的影响,否则就要重新审视一下这个累加器的设计是否合理。

四、使用Accumulator的陷阱

来讨论一下使用累加器的一些陷阱,累加器的累加是在Task中进行的,而这些Task就是我们在Dataset上调用的一些算子操作,这些算子操作有Transform的,也有Action的,来探讨一下不同类型的算子对Accumulator有什么影响。package cc11001100.spark.sharedVariables.accumulators;

import org.apache.spark.SparkContext;

import org.apache.spark.api.java.function.MapFunction;

import org.apache.spark.sql.Dataset;

import org.apache.spark.sql.Encoders;

import org.apache.spark.sql.SparkSession;

import org.apache.spark.util.LongAccumulator;

import java.util.Arrays;

/**

* 累加器使用的陷阱

*

* @author CC11001100

*/

public class AccumulatorTrapDemo {

public static void main(String[] args) {

SparkSession spark = SparkSession.builder().master("local[*]").getOrCreate();

SparkContext sc = spark.sparkContext();

LongAccumulator longAccumulator = sc.longAccumulator("long-account");

// ------------------------------- 在transform算子中的错误使用 -------------------------------------------

Dataset num1 = spark.createDataset(Arrays.asList(1, 2, 3), Encoders.INT());

Dataset nums2 = num1.map((MapFunction) x -> {

longAccumulator.add(1);

return x;

}, Encoders.INT());

// 因为没有Action操作,nums.map并没有被执行,因此此时广播变量的值还是0

System.out.println("num2 1: " + longAccumulator.value()); // 0

// 调用一次action操作,num.map得到执行,广播变量被改变

nums2.count();

System.out.println("num2 2: " + longAccumulator.value()); // 3

// 又调用了一次Action操作,广播变量所在的map又被执行了一次,所以累加器又被累加了一遍,就悲剧了

nums2.count();

System.out.println("num2 3: " + longAccumulator.value()); // 6

// ------------------------------- 在transform算子中的正确使用 -------------------------------------------

// 累加器不应该被重复使用,或者在合适的时候进行cache断开与之前Dataset的血缘关系,因为cache了就不必重复计算了

longAccumulator.setValue(0);

Dataset nums3 = num1.map((MapFunction) x -> {

longAccumulator.add(1);

return x;

}, Encoders.INT()).cache(); // 注意这个地方进行了cache

// 因为没有Action操作,nums.map并没有被执行,因此此时广播变量的值还是0

System.out.println("num3 1: " + longAccumulator.value()); // 0

// 调用一次action操作,广播变量被改变

nums3.count();

System.out.println("num3 2: " + longAccumulator.value()); // 3

// 又调用了一次Action操作,因为前一次调用count时num3已经被cache,num2.map不会被再执行一遍,所以这里的值还是3

nums3.count();

System.out.println("num3 3: " + longAccumulator.value()); // 3

// ------------------------------- 在action算子中的使用 -------------------------------------------

longAccumulator.setValue(0);

num1.foreach(x -> {

longAccumulator.add(1);

});

// 因为是Action操作,会被立即执行所以打印的结果是符合预期的

System.out.println("num4: " + longAccumulator.value()); // 3

}

}

五、Accumulator使用的奇淫技巧

累加器并不是只能用来实现加法,也可以用来实现减法,直接把要累加的数值改成负数就可以了:package cc11001100.spark.sharedVariables.accumulators;

import org.apache.spark.sql.Dataset;

import org.apache.spark.sql.Encoders;

import org.apache.spark.sql.SparkSession;

import org.apache.spark.util.LongAccumulator;

import java.util.Arrays;

/**

* 使用累加器实现减法

*

* @author CC11001100

*/

public class AccumulatorSubtraction {

public static void main(String[] args) {

SparkSession spark = SparkSession.builder().master("local[*]").getOrCreate();

Dataset nums = spark.createDataset(Arrays.asList(1, 2, 3, 4, 5, 6, 7, 8), Encoders.INT());

LongAccumulator longAccumulator = spark.sparkContext().longAccumulator("AccumulatorSubtraction");

nums.foreach(x -> {

if (x % 3 == 0) {

longAccumulator.add(-2);

} else {

longAccumulator.add(1);

}

});

System.out.println("longAccumulator: " + longAccumulator.value()); // 2

}

}

相关资料:

.

累加器 java_Spark笔记之累加器(Accumulator)相关推荐

  1. PySpark 累加器使用及自定义累加器

    累加器(accumulator) 功能 实现在Driver端和Executor端共享变量 写的功能 实现机制 Driver端定义的变量,在Executor端的每个Task都会得到这个变量的副本; 在每 ...

  2. 计算机累加器有加法器功能吗,累加器是什么_累加器的作用及原理介绍

    累加器的概念 在中央处理器中,累加器(accumulator)是一种寄存器,用来储存计算产生的中间结果.如果没有像累加器这样的寄存器,那么在每次计算(加法,乘法,移位等等)后就必须要把结果写回到内存, ...

  3. java 累加器_Spark基础教程——累加器的使用(JAVA)

    通常在向Spark传递函数时,比如使用map()函数或者用filter()传条件时,可以使用驱动器程序中定义的变量,但是集群中运行的每个任务都会得到这些变量的一份新的副本,更新这些副本的值也不会影响驱 ...

  4. 密码学之累加器【笔记】

    累加器 累加器可将多个值累加到一个值,可用于隐藏每个被累加的值,并对被累加的每个值做认证.累加器可以用函数hn:Xn∗Yn→Xnh_n:X_n*Y_n\to X_nhn​:Xn​∗Yn​→Xn​定义为 ...

  5. FPGA学习笔记25--乘累加器

    module MAC(out,opa,opb,clk,clr); output[15:0] out; input[7:0] opa,opb; input clk,clr; wire[15:0] sum ...

  6. python 累加器_python中的累加器

    你想问的第一件事是当你print tries时,你得到了什么? 实际上,你会发现你总是得到1.在 因为,你没有增加.你没有添加,因为用户一直在猜测.在 通常,您可以为每个猜测编写tries = tri ...

  7. 嵌入式开发学习笔记5-了解单片机中的特殊功能寄存器(寄存器B、累加器A和程序状态字PSW)

    嵌入式开发学习笔记5-了解单片机中的特殊功能寄存器(寄存器.累加器和程序状态字) 累加器A 寄存器B 程序状态字PSW 累加器A 累加器A是ACC(Accumulator)的缩写,累加器A是一个具有特 ...

  8. Flink的累加器(Accumulator)应用

    1. 累加器的简单介绍 累加器是从用户函数和操作中,分布式地统计或者聚合信息.每个并行实例创建并更新自己的Accumulator对象, 然后合并收集器的不同并行实例.在作业结束时由系统合并. 累加器的 ...

  9. Spark Accumulator累加器

    什么是累加器 累加器用来对信息进行聚合 1 算子在计算时,不会影响到driver里的变量的值(driver里的变量称之为共享变量) 2 算子使用的其实都是driver里的变量的一个副本 3 如果想要影 ...

最新文章

  1. 越南黑客组织APT32正对NSA网络武器开展研究利用
  2. 从工作的角度,NLP/CV/推荐系统选哪个?
  3. 基础-快速排序(基本)
  4. Scala入门到精通——第二十三节 高级类型 (二)
  5. [Network Architecture]DPN(Dual Path Network)算法详解(转)
  6. 谷歌浏览器地址栏记录怎么删除 Chrome浏览器地址栏记录清除方法
  7. 转-开源云API竞争大幕开启
  8. python函数多次调用内存溢出_关于循环多次(具体160次左右)获取同一个句柄对象,不同实例,内存溢出的问题。...
  9. Atitit.数据库表的物理存储结构原理与架构设计与实践
  10. python 24点题目分析_24点游戏的递归解法和Python实现
  11. C-Free5.0破解码
  12. Hybrid App基础知识
  13. vivado配置EMIO(使用vivado配置XDC文件)
  14. 如何使用CorelDRAW 2019快速制作幻影图像效果
  15. mysql配置文件生效测试
  16. 《西瓜书》笔记整理——第一章
  17. String Matching 字符串匹配算法——干货从头放到尾
  18. 校园点餐系统:点餐、食堂管理、商户管理和菜品管理(Java和MySQL)
  19. Android与H5(JS)互调
  20. okhttp3.RequestBody.create(Ljava/lang/String;Lokhttp3/MediaType;)Lokhttp3/RequestBody

热门文章

  1. 欧拉角中各种角度的名称
  2. 解读手机拍照的各个参数(108M)
  3. 【附源码】计算机毕业设计SSM实验室预约管理系统
  4. 侍魂无限跳服务器,经典游戏《真侍魂》橘右京的无限残像,对手就这么活活的被挡死了...
  5. 下载国外软件慢(例如:Python安装包)
  6. PDF格式转换、合并、分割、加水印、加页码各种操作,收藏这几个网站就够啦
  7. win10/11下wsl2安装gpu版的pytorch(避坑指南)
  8. EMU8086开发工具_微机原理
  9. 北京房价当日报20141013
  10. js 判断是否为 Android IOS IPAD IPHONE 等移动设备访问