什么是累加器

累加器用来对信息进行聚合
1 算子在计算时,不会影响到driver里的变量的值(driver里的变量称之为共享变量)
2 算子使用的其实都是driver里的变量的一个副本
3 如果想要影响driver里的变量,需要搜集数据到Driver端才行
4 除了搜集之外,Spark提供的累加器也可以完成对Driver中的变量的更新.

为何需要累加器?

算子在计算时,不会影响到driver里的变量的值(driver里的变量称之为共享变量)

object Test_021 {def main(args: Array[String]): Unit = {val conf: SparkConf = new SparkConf().setAppName("count").setMaster("local")val sc = new SparkContext(conf)var arr = sc.parallelize(Array(1, 2, 3, 4, 5, 6, 7, 8), 2)//sum是在driver上的sumvar sum = 0//算子是在worker里的executor上里执行的arr.foreach(x => {//sum是driver上传送过来的,初始值0,然后再worker上进行累加,并没有累加到driver端的sum上sum += x})//打印的是driver自己的sum,所以结果是0println(sum) //0}
}

不用累加器进行求和

object Test_021 {def main(args: Array[String]): Unit = {val conf: SparkConf = new SparkConf().setAppName("count").setMaster("local")val sc = new SparkContext(conf)var arr = sc.parallelize(Array(1, 2, 3, 4, 5, 6, 7, 8), 2)//sum是在driver上的sumvar sum = 0arr.collect()foreach(x => {sum += x})println(sum) //36}
}

低版本累加器

  • 低版本累加器,可以帮我们完成求和等操作
  • SparkContext有一个accumulator方法
  • 调用时,传入一个初始值
  • 在累加时,调用累加器的add方法
  • 在获取累加器的值时,调用累加器的value方法
object Test_021 {def main(args: Array[String]): Unit = {val conf: SparkConf = new SparkConf().setAppName("count").setMaster("local")val sc = new SparkContext(conf)var arr = sc.parallelize(Array(1, 2, 3, 4, 5, 6, 7, 8), 2)//使用低版本累加器var myacc: Accumulator[Int] =sc.accumulator(0)arr.foreach(x=>{myacc.add(x)})println(myacc.value)//36}
}

高版本累加器AccumulatorV2

  • 本身是个抽象类
  • 有一些可用的子类累加器 比如 CollectionAccumulator,DoubleAccumulator,LongAccumulator
  • 使用时需要创建子类型对象并在Spark-Context里面注册
object Test_021 {def main(args: Array[String]): Unit = {val conf: SparkConf = new SparkConf().setAppName("sum").setMaster("local")val sc = new SparkContext(conf)var arr = sc.parallelize(Array(1, 2, 3, 4, 5, 6, 7, 8), 2)//创建累加器对象//An accumulator for computing sum, count, and average of 64-bit integers.var myAcc = new LongAccumulator()//向上下文注册累加器//Register the given accumulator with given name.sc.register(myAcc, "sum")arr.foreach(x => {myAcc.add(x)})println(myAcc.value) //36}
}

自定义累加器

1 继承AccumulatorV2
2 规定泛型,第一个泛型是要输入的数据类型,第二个是要输出的数据类型
3 定义一个成员变量

object Test_021 {def main(args: Array[String]): Unit = {val conf: SparkConf = new SparkConf().setAppName("sum").setMaster("local")val sc = new SparkContext(conf)var arr = sc.parallelize(Array(1, 2, 3, 4, 5, 6, 7, 8), 2)//创建累加器对象//An accumulator for computing sum, count, and average of 64-bit integers.var myAcc = new SumAccumulator//向上下文注册累加器//Register the given accumulator with given name.sc.register(myAcc, "sum")arr.foreach(x => {myAcc.add(x)})println(myAcc.value) //36}
}class SumAccumulator extends  AccumulatorV2[Long,Long]{//定义一个变量,存储累加后的结果var sum:Long=0//判断累加器是否为空,true表示空override def isZero: Boolean = {//sum的结果为0表示没有累加过,即为空sum==0}//复制累加器对象到别的worker上,也就是创建一个新的累加器对象override def copy(): AccumulatorV2[Long, Long] ={val other =new SumAccumulator//将累加器的对象的值得对象赋值到新的累加器对象上other.sum=this.sumother}//重置累加器,就是回归初始值override def reset(): Unit = {sum=0}//将要累加的数据累加到累加器的值上override def add(v: Long): Unit = {sum+=v}//用于两两合并累加器的值,override def merge(other: AccumulatorV2[Long, Long]): Unit = {sum+=other.value}override def value: Long = {sum}
}

利用自定义累加器统计单词

import org.apache.spark.rdd.RDD
import org.apache.spark.util.AccumulatorV2
import org.apache.spark.{SparkConf, SparkContext}
import scala.collection.mutable
//利用累加器统计单词
object _TestAcc {def main(args: Array[String]): Unit = {val conf: SparkConf = new SparkConf().setAppName("wordcount").setMaster("local")val sc = new SparkContext(conf)val words: RDD[String] = sc.parallelize(Array("hello", "word", "hello", "word", "kitty", "word"))val myAcc = new WordCountAccumulatorsc.register(myAcc)words.foreach(myAcc.add)for (elem <- myAcc.value) {println(elem)}}
}class WordCountAccumulator extends AccumulatorV2[String, mutable.HashMap[String, Int]] {//成员变量的维护var map = new mutable.HashMap[String, Int]()override def isZero: Boolean = {map.isEmpty}override def copy(): AccumulatorV2[String, mutable.HashMap[String, Int]] = {val newAcc = new WordCountAccumulatornewAcc.map = this.mapnewAcc}override def reset(): Unit = {map.clear()}override def add(v: String): Unit = {//分区类累加,查看这个单词是否存在map中,如果不存在,则value是1,如果存在,取出value,累加1map模式匹配只有两种,要么None,要么Some(value)map.get(v) match {case None => map.put(v, 1)case Some(x) => map.put(v, x + 1)}}override def merge(other: AccumulatorV2[String, mutable.HashMap[String, Int]]): Unit = {//两个累加器进行合并时,如果有相同单词,就累加value值.如果没有相同的单词,就直接封装原来的值for (elem <- other.value) {//表示的是other里的每一个单词的kv对象//查看this的map中是否有other里的这个单词map.get(elem._1) match {case Some(e) => map.put(elem._1, e + elem._2)case None => map.put(elem._1, elem._2)}}}override def value: mutable.HashMap[String, Int] = {map}
}

注意事项

1.累加器的创建:

1.1.创建一个累加器的实例
1.2.通过sc.register()注册一个累加器
1.3.通过累加器实名.add来添加数据
1.4.通过累加器实例名.value来获取累加器的值

2.最好不要在转换操作中访问累加器(因为血统的关系和转换操作可能执行多次),最好在行动操作中访问

3 由于最终还是要返回到Driver端进行汇报,因此要注意累加的数据量结果的大小问题.

作用:

1.能够精确的统计数据的各种数据例如:
可以统计出符合userID的记录数,在同一个时间段内产生了多少次购买,可以使用ETL进行数据清洗,并使用Accumulator来进行数据的统计

2.作为调试工具,能够观察每个task的信息,通过累加器可以在sparkIUI观察到每个task所处理的记录数

Spark Accumulator累加器相关推荐

  1. Spark 之 Accumulator 累加器

    Spark Accumulator 累加器作用 源码 累加器原理图 Spark中累加器的执行流程: 累加器使用demo spark ui 使用累加器中可能遇到的坑 线程安全问题 累加器作用 累加器:分 ...

  2. Accumulator累加器(一)

    累加器 应用场景:Driver端定义一个共享变量,将数据累加到该变量上,如果直接用foreach或map等迭代算子, 是无法将累加的变量返回到driver端,因为累加的过程发生在Executor端.一 ...

  3. spark中累加器的使用(转)

    环境: ubuntu16.04 64 伪分布式 使用的spark是2.3.1 scala 2.11.8 参考连接: https://blog.csdn.net/android_xue/article/ ...

  4. spark变量使用broadcast、accumulator

    broadcast 官方文档描述: Broadcast a read-only variable to the cluster, returning a [[org.apache.spark.broa ...

  5. Spark RDD概念学习系列之rdd持久化、广播、累加器(十八)

    1.rdd持久化 2.广播 3.累加器 1.rdd持久化 通过spark-shell,可以快速的验证我们的想法和操作! 启动hdfs集群 spark@SparkSingleNode:/usr/loca ...

  6. Spark编程指引(四)----共享变量(广播变量和累加器)

    转自:http://blog.csdn.net/happyanger6/article/details/46576831 共享变量 通常情况下,当向Spark操作(如map,reduce)传递一个函数 ...

  7. Spark Java API:broadcast、accumulator

    broadcast 官方文档描述: Broadcast a read-only variable to the cluster, returning a [[org.apache.spark.broa ...

  8. spark共享变量(广播变量Broadcast Variable,累加器Accumulators)

    2019独角兽企业重金招聘Python工程师标准>>> 一般来说,当一个函数被传递给Spark操作(例如map和reduce),在一个远程集群上运行,它实际上操作的是这个函数用到的所 ...

  9. Spark共享变量(广播变量、累加器)

    Spark两种共享变量:广播变量(broadcast variable)与累加器(accumulator) 累加器用来对信息进行聚合,而广播变量用来高效分发较大的对象. 共享变量出现的原因: 通常在向 ...

最新文章

  1. 测试两个主机之间的连通性_UCloud 全链路大规模网络连通性检测系统详解
  2. CrazePony飞行器--相关资料网址
  3. 图像识别:微信跳一跳机器人
  4. SAS Viya调研概述
  5. php cannot bind port to socket,PHP基于socket实现客户端和服务端通讯功能
  6. 硅谷初创企业控制成本 裁员风渐起
  7. MATLAB拟合圆函数
  8. 计算机入职规划,入职后的工作生涯规划范文
  9. NAO机器人语音识别
  10. python绘制隐含波动率曲面_隐含波动率曲面
  11. DDS每个数据包和域ID大小的数据开销
  12. 带通滤波器是什么,它的原理是什么
  13. 【云原生】Helm 常用命令(chart 安装、升级、回滚、卸载等操作)
  14. 服务器所在文件夹路径,服务器上文件夹路径
  15. springboot内嵌tomcat如何优雅开启http端口
  16. 【读报告】基于物联网技术的道岔转换设备检测专家平台的研究 研制报告
  17. 1. 不吹不擂,第一篇就能提升你对Bean Validation数据校验的认知
  18. 清明节到来,微信公众号图文排版有哪些使用技巧?
  19. C语言学习之路——程序设计概述
  20. IPv6的被请求节点的组播地址

热门文章

  1. windows系统bat批处 注册一个exe执行文件变成服务
  2. jquery 如何插入元素
  3. eltable 无数据文案修改_el-table的二次封装详细版(一)
  4. php 日期 星期_php日期如何转星期
  5. IDEA插件推荐:Material Theme UI(把IDEA变得更加美观)
  6. 你真的理解a -- -- a a++ ++a 吗?
  7. 直接请求接口_【分享】接口是什么?实现原理的是什么?
  8. vscode 打开函数表_效率倍增!10个超级好用的VScode使用技巧!
  9. 计算机机房维护保养计划表,机房日常维护保养计划
  10. atom对比 vscode_几款前端IDE工具:Sublime、Atom、VSCode比较