累加器(accumulator)是Spark中提供的一种分布式的变量机制,其原理类似于mapreduce,即分布式的改变,然后聚合这些改变。累加器的一个常见用途是在调试时对作业执行过程中的事件进行计数。

累加器简单使用
Spark内置的提供了Long和Double类型的累加器。下面是一个简单的使用示例,在这个例子中我们在过滤掉RDD中奇数的同时进行计数,最后计算剩下整数的和。

 val sparkConf = new SparkConf().setAppName("Test").setMaster("local[2]")val sc = new SparkContext(sparkConf)val accum = sc.longAccumulator("longAccum") //统计奇数的个数val sum = sc.parallelize(Array(1,2,3,4,5,6,7,8,9),2).filter(n=>{if(n%2!=0) accum.add(1L) n%2==0}).reduce(_+_)println("sum: "+sum)println("accum: "+accum.value)sc.stop()

结果为:
sum: 20
accum: 5

这是结果正常的情况,但是在使用累加器的过程中如果对于spark的执行过程理解的不够深入就会遇到两类典型的错误:少加(或者没加)、多加。

少加的情况:

对于如下代码:

 val accum = sc.longAccumulator("longAccum")val numberRDD = sc.parallelize(Array(1,2,3,4,5,6,7,8,9),2).map(n=>{accum.add(1L)n+1})println("accum: "+accum.value)

执行完毕,打印的值是多少呢?答案是0,因为累加器不会改变spark的lazy的计算模型,即在打印的时候像map这样的transformation还没有真正的执行,从而累加器的值也就不会更新。
多加的情况:

对于如下代码:

 val accum = sc.longAccumulator("longAccum")val numberRDD = sc.parallelize(Array(1,2,3,4,5,6,7,8,9),2).map(n=>{accum.add(1L)n+1})numberRDD.countprintln("accum1:"+accum.value)numberRDD.reduce(_+_)println("accum2: "+accum.value)

结果我们得到了:
accum1:9

accum2: 18

我们虽然只在map里进行了累加器加1的操作,但是两次得到的累加器的值却不一样,这是由于count和reduce都是action类型的操作,触发了两次作业的提交,所以map算子实际上被执行了了两次,在reduce操作提交作业后累加器又完成了一轮计数,所以最终累加器的值为18。究其原因是因为count虽然促使numberRDD被计出来,但是由于没有对其进行缓存,所以下次再次需要使用numberRDD这个数据集是,还需要从并行化数据集的部分开始执行计算。解释到这里,这个问题的解决方法也就很清楚了,就是在count之前调用numberRDD的cache方法(或persist),这样在count后数据集就会被缓存下来,reduce操作就会读取缓存的数据集而无需从头开始计算了。改成如下代码即可:

 val accum = sc.longAccumulator("longAccum")val numberRDD = sc.parallelize(Array(1,2,3,4,5,6,7,8,9),2).map(n=>{accum.add(1L)n+1})numberRDD.cache().countprintln("accum1:"+accum.value)numberRDD.reduce(_+_)println("accum2: "+accum.value)

这次两次打印的值就会保持一致了。

自定义累加器
自定义累加器类型的功能在1.X版本中就已经提供了,但是使用起来比较麻烦,在2.0版本后,累加器的易用性有了较大的改进,而且官方还提供了一个新的抽象类:AccumulatorV2来提供更加友好的自定义类型累加器的实现方式。官方同时给出了一个实现的示例:CollectionAccumulator类,这个类允许以集合的形式收集spark应用执行过程中的一些信息。例如,我们可以用这个类收集Spark处理数据时的一些细节,当然,由于累加器的值最终要汇聚到driver端,为了避免 driver端的outofmemory问题,需要对收集的信息的规模要加以控制,不宜过大。
实现自定义类型累加器需要继承AccumulatorV2并至少覆写下例中出现的方法,下面这个累加器可以用于在程序运行过程中收集一些文本类信息,最终以Set[String]的形式返回。


import java.utilimport org.apache.spark.util.AccumulatorV2class LogAccumulator extends AccumulatorV2[String, java.util.Set[String]] {private val _logArray: java.util.Set[String] = new java.util.HashSet[String]()override def isZero: Boolean = {_logArray.isEmpty}override def reset(): Unit = {_logArray.clear()}override def add(v: String): Unit = {_logArray.add(v)}override def merge(other: AccumulatorV2[String, java.util.Set[String]]): Unit = {other match {case o: LogAccumulator => _logArray.addAll(o.value)}}override def value: java.util.Set[String] = {java.util.Collections.unmodifiableSet(_logArray)}override def copy(): AccumulatorV2[String, util.Set[String]] = {val newAcc = new LogAccumulator()_logArray.synchronized{newAcc._logArray.addAll(_logArray)}newAcc}
}

测试类


import scala.collection.JavaConversions._import org.apache.spark.{SparkConf, SparkContext}object Main {def main(args: Array[String]): Unit = {val sparkConf = new SparkConf().setAppName("Test").setMaster("local[2]")val sc = new SparkContext(sparkConf)val accum = new LogAccumulatorsc.register(accum, "logAccum")val sum = sc.parallelize(Array("1", "2a", "3", "4b", "5", "6", "7cd", "8", "9"), 2).filter(line => {val pattern = """^-?(\d+)"""val flag = line.matches(pattern)if (!flag) {accum.add(line)}flag}).map(_.toInt).reduce(_ + _)println("sum: " + sum)for (v <- accum.value) print(v + " ")println()sc.stop()}
}

本例中利用自定义的收集器收集过滤操作中被过滤掉的元素,当然这部分的元素的数据量不能太大。运行结果如下:
sum; 32
7cd 4b 2a

累加器使用的注意点及自定义累加器相关推荐

  1. Spark2.10中使用累加器、注意点以及实现自定义累加器

    累加器(accumulator)是Spark中提供的一种分布式的变量机制,其原理类似于mapreduce,即分布式的改变,然后聚合这些改变.累加器的一个常见用途是在调试时对作业执行过程中的事件进行计数 ...

  2. 【大数据开发】SparkCore——利用广播变量优化ip地址统计、Spark2.x自定义累加器

    文章目录 一.Broadcast广播变量 1.1 广播变量的逻辑过程 1.2 [优化ip地址统计](https://blog.csdn.net/weixin_37090394/article/deta ...

  3. PySpark 累加器使用及自定义累加器

    累加器(accumulator) 功能 实现在Driver端和Executor端共享变量 写的功能 实现机制 Driver端定义的变量,在Executor端的每个Task都会得到这个变量的副本; 在每 ...

  4. 累加器是寄存器吗?寄存器、累加器、暂存器有什么区别?

    什么是寄存器 寄存器,是集成电路中非常重要的一种存储单元,通常由触发器组成.在集成电路设计中,寄存器可分为电路内部使用的寄存器和充当内外部接口的寄存器这两类. 内部寄存器不能被外部电路或软件访问,只是 ...

  5. 累加器实验总结计算机组成,计算机组成原理累加器实验报告

    计算机组成原理累加器实验报告 1 课程设计任务书 学 院 信息学院 专 业 计算机科学与技术 学生姓 名 学 号 设计题 目 研制一台多累加器结构的实验计算机 内容及要求: 利用 EL-JY-II 型 ...

  6. python 累加器_Python编程第5课:累加器,变量与赋值进阶练习

    [回顾]Python编程第4课计数器的练习题答案. 1.C    2.24  3.请见下方 a=8 print("小华的岁数:",a) a=a+23 print("小华爸 ...

  7. 自定义Spark累加器

    一.spark累加器源码 以创建一个long类型的累加器为例查看源码 sc.longAccumulator 跟踪这个longAccumulator这个方法进去可以看到 /*** Create and ...

  8. spark项目实战:电商分析平台之各个范围Session步长、访问时长占比统计(需求一)

    spark项目实战:电商分析平台之各个范围Session步长.访问时长占比统计(需求一) 项目基本信息,架构,需要一览 各个范围Session步长.访问时长占比统计概述 各个范围Session步长.访 ...

  9. spark技术学习与思考(sparkcoresparksql)

    目录 1.spark 基础 1.1 spark 发展历程 1.2 spark 与 mapreduce 对比 1.3 spark 运行模式 1.4 spark 常用命令 1.5 spark 底层执行原理 ...

最新文章

  1. 看完这些能控制大脑的寄生虫,你会怀疑人类!
  2. LInux main.cpp 编码问题 导致影响后面的内容
  3. 青龙羊毛——新快手极速版(搬运,非原创)
  4. 数据结构 【实验7 二叉树基本操作】
  5. linux下nginx安装与设置开机启动
  6. 如何生成高性能的短链接?
  7. Closure--1
  8. python编程多行输入_Python20-02_GUI编程----Text多行文本框详解
  9. ASP.NET Core MVC 打造一个简单的图书馆管理系统 (修正版)(二)数据库初始化、基本登录页面以及授权逻辑的建立...
  10. C# WinForm 基础教程
  11. 移动端一倍图,二倍图尺寸
  12. 使用Wps切分单页PDF文件为多页pdf
  13. 抖音收购musical.ly后,最难过的为什么是快手?
  14. 《遥远的救世主》遵守客观规律(三)——文化属性
  15. 微信公众号教程—记录个人公众号运营(待更新)
  16. 代码审计--25--RIPS详细
  17. xcode更新一直失败的解决办法
  18. 微信小程序太阳落日效果
  19. 2019.11.10
  20. 基于jsp的bbs论坛-(7)jsp网页的实现

热门文章

  1. Kotlin开发利器之协程
  2. .net 2.0安装包打不开_Android——bilibili缓存视频合并教程[2.0]
  3. 机器学习和深度学习的主要术语(中英)
  4. 如何魔改Xilinx Vivado 的MIG IP核
  5. 在线24点计算器工具
  6. 智商和情商哪个更重要
  7. Buffon投针实验
  8. 【C语言】main 函数的正确写法
  9. 教学资源库建设计算机专业,教学资源库建设计算机应用论文
  10. 【图像分割】基于方向谷形检测实现静脉纹路分割附MATLAB代码