Spark Accumulator累加器
什么是累加器
累加器用来对信息进行聚合
1 算子在计算时,不会影响到driver里的变量的值(driver里的变量称之为共享变量)
2 算子使用的其实都是driver里的变量的一个副本
3 如果想要影响driver里的变量,需要搜集数据到Driver端才行
4 除了搜集之外,Spark提供的累加器也可以完成对Driver中的变量的更新.
为何需要累加器?
算子在计算时,不会影响到driver里的变量的值(driver里的变量称之为共享变量)
object Test_021 {def main(args: Array[String]): Unit = {val conf: SparkConf = new SparkConf().setAppName("count").setMaster("local")val sc = new SparkContext(conf)var arr = sc.parallelize(Array(1, 2, 3, 4, 5, 6, 7, 8), 2)//sum是在driver上的sumvar sum = 0//算子是在worker里的executor上里执行的arr.foreach(x => {//sum是driver上传送过来的,初始值0,然后再worker上进行累加,并没有累加到driver端的sum上sum += x})//打印的是driver自己的sum,所以结果是0println(sum) //0}
}
不用累加器进行求和
object Test_021 {def main(args: Array[String]): Unit = {val conf: SparkConf = new SparkConf().setAppName("count").setMaster("local")val sc = new SparkContext(conf)var arr = sc.parallelize(Array(1, 2, 3, 4, 5, 6, 7, 8), 2)//sum是在driver上的sumvar sum = 0arr.collect()foreach(x => {sum += x})println(sum) //36}
}
低版本累加器
- 低版本累加器,可以帮我们完成求和等操作
- SparkContext有一个accumulator方法
- 调用时,传入一个初始值
- 在累加时,调用累加器的add方法
- 在获取累加器的值时,调用累加器的value方法
object Test_021 {def main(args: Array[String]): Unit = {val conf: SparkConf = new SparkConf().setAppName("count").setMaster("local")val sc = new SparkContext(conf)var arr = sc.parallelize(Array(1, 2, 3, 4, 5, 6, 7, 8), 2)//使用低版本累加器var myacc: Accumulator[Int] =sc.accumulator(0)arr.foreach(x=>{myacc.add(x)})println(myacc.value)//36}
}
高版本累加器AccumulatorV2
- 本身是个抽象类
- 有一些可用的子类累加器 比如 CollectionAccumulator,DoubleAccumulator,LongAccumulator
- 使用时需要创建子类型对象并在Spark-Context里面注册
object Test_021 {def main(args: Array[String]): Unit = {val conf: SparkConf = new SparkConf().setAppName("sum").setMaster("local")val sc = new SparkContext(conf)var arr = sc.parallelize(Array(1, 2, 3, 4, 5, 6, 7, 8), 2)//创建累加器对象//An accumulator for computing sum, count, and average of 64-bit integers.var myAcc = new LongAccumulator()//向上下文注册累加器//Register the given accumulator with given name.sc.register(myAcc, "sum")arr.foreach(x => {myAcc.add(x)})println(myAcc.value) //36}
}
自定义累加器
1 继承AccumulatorV2
2 规定泛型,第一个泛型是要输入的数据类型,第二个是要输出的数据类型
3 定义一个成员变量
object Test_021 {def main(args: Array[String]): Unit = {val conf: SparkConf = new SparkConf().setAppName("sum").setMaster("local")val sc = new SparkContext(conf)var arr = sc.parallelize(Array(1, 2, 3, 4, 5, 6, 7, 8), 2)//创建累加器对象//An accumulator for computing sum, count, and average of 64-bit integers.var myAcc = new SumAccumulator//向上下文注册累加器//Register the given accumulator with given name.sc.register(myAcc, "sum")arr.foreach(x => {myAcc.add(x)})println(myAcc.value) //36}
}class SumAccumulator extends AccumulatorV2[Long,Long]{//定义一个变量,存储累加后的结果var sum:Long=0//判断累加器是否为空,true表示空override def isZero: Boolean = {//sum的结果为0表示没有累加过,即为空sum==0}//复制累加器对象到别的worker上,也就是创建一个新的累加器对象override def copy(): AccumulatorV2[Long, Long] ={val other =new SumAccumulator//将累加器的对象的值得对象赋值到新的累加器对象上other.sum=this.sumother}//重置累加器,就是回归初始值override def reset(): Unit = {sum=0}//将要累加的数据累加到累加器的值上override def add(v: Long): Unit = {sum+=v}//用于两两合并累加器的值,override def merge(other: AccumulatorV2[Long, Long]): Unit = {sum+=other.value}override def value: Long = {sum}
}
利用自定义累加器统计单词
import org.apache.spark.rdd.RDD
import org.apache.spark.util.AccumulatorV2
import org.apache.spark.{SparkConf, SparkContext}
import scala.collection.mutable
//利用累加器统计单词
object _TestAcc {def main(args: Array[String]): Unit = {val conf: SparkConf = new SparkConf().setAppName("wordcount").setMaster("local")val sc = new SparkContext(conf)val words: RDD[String] = sc.parallelize(Array("hello", "word", "hello", "word", "kitty", "word"))val myAcc = new WordCountAccumulatorsc.register(myAcc)words.foreach(myAcc.add)for (elem <- myAcc.value) {println(elem)}}
}class WordCountAccumulator extends AccumulatorV2[String, mutable.HashMap[String, Int]] {//成员变量的维护var map = new mutable.HashMap[String, Int]()override def isZero: Boolean = {map.isEmpty}override def copy(): AccumulatorV2[String, mutable.HashMap[String, Int]] = {val newAcc = new WordCountAccumulatornewAcc.map = this.mapnewAcc}override def reset(): Unit = {map.clear()}override def add(v: String): Unit = {//分区类累加,查看这个单词是否存在map中,如果不存在,则value是1,如果存在,取出value,累加1map模式匹配只有两种,要么None,要么Some(value)map.get(v) match {case None => map.put(v, 1)case Some(x) => map.put(v, x + 1)}}override def merge(other: AccumulatorV2[String, mutable.HashMap[String, Int]]): Unit = {//两个累加器进行合并时,如果有相同单词,就累加value值.如果没有相同的单词,就直接封装原来的值for (elem <- other.value) {//表示的是other里的每一个单词的kv对象//查看this的map中是否有other里的这个单词map.get(elem._1) match {case Some(e) => map.put(elem._1, e + elem._2)case None => map.put(elem._1, elem._2)}}}override def value: mutable.HashMap[String, Int] = {map}
}
注意事项
1.累加器的创建:
1.1.创建一个累加器的实例
1.2.通过sc.register()注册一个累加器
1.3.通过累加器实名.add来添加数据
1.4.通过累加器实例名.value来获取累加器的值
2.最好不要在转换操作中访问累加器(因为血统的关系和转换操作可能执行多次),最好在行动操作中访问
3 由于最终还是要返回到Driver端进行汇报,因此要注意累加的数据量结果的大小问题.
作用:
1.能够精确的统计数据的各种数据例如:
可以统计出符合userID的记录数,在同一个时间段内产生了多少次购买,可以使用ETL进行数据清洗,并使用Accumulator来进行数据的统计
2.作为调试工具,能够观察每个task的信息,通过累加器可以在sparkIUI观察到每个task所处理的记录数
Spark Accumulator累加器相关推荐
- Spark 之 Accumulator 累加器
Spark Accumulator 累加器作用 源码 累加器原理图 Spark中累加器的执行流程: 累加器使用demo spark ui 使用累加器中可能遇到的坑 线程安全问题 累加器作用 累加器:分 ...
- Accumulator累加器(一)
累加器 应用场景:Driver端定义一个共享变量,将数据累加到该变量上,如果直接用foreach或map等迭代算子, 是无法将累加的变量返回到driver端,因为累加的过程发生在Executor端.一 ...
- spark中累加器的使用(转)
环境: ubuntu16.04 64 伪分布式 使用的spark是2.3.1 scala 2.11.8 参考连接: https://blog.csdn.net/android_xue/article/ ...
- spark变量使用broadcast、accumulator
broadcast 官方文档描述: Broadcast a read-only variable to the cluster, returning a [[org.apache.spark.broa ...
- Spark RDD概念学习系列之rdd持久化、广播、累加器(十八)
1.rdd持久化 2.广播 3.累加器 1.rdd持久化 通过spark-shell,可以快速的验证我们的想法和操作! 启动hdfs集群 spark@SparkSingleNode:/usr/loca ...
- Spark编程指引(四)----共享变量(广播变量和累加器)
转自:http://blog.csdn.net/happyanger6/article/details/46576831 共享变量 通常情况下,当向Spark操作(如map,reduce)传递一个函数 ...
- Spark Java API:broadcast、accumulator
broadcast 官方文档描述: Broadcast a read-only variable to the cluster, returning a [[org.apache.spark.broa ...
- spark共享变量(广播变量Broadcast Variable,累加器Accumulators)
2019独角兽企业重金招聘Python工程师标准>>> 一般来说,当一个函数被传递给Spark操作(例如map和reduce),在一个远程集群上运行,它实际上操作的是这个函数用到的所 ...
- Spark共享变量(广播变量、累加器)
Spark两种共享变量:广播变量(broadcast variable)与累加器(accumulator) 累加器用来对信息进行聚合,而广播变量用来高效分发较大的对象. 共享变量出现的原因: 通常在向 ...
最新文章
- 测试两个主机之间的连通性_UCloud 全链路大规模网络连通性检测系统详解
- CrazePony飞行器--相关资料网址
- 图像识别:微信跳一跳机器人
- SAS Viya调研概述
- php cannot bind port to socket,PHP基于socket实现客户端和服务端通讯功能
- 硅谷初创企业控制成本 裁员风渐起
- MATLAB拟合圆函数
- 计算机入职规划,入职后的工作生涯规划范文
- NAO机器人语音识别
- python绘制隐含波动率曲面_隐含波动率曲面
- DDS每个数据包和域ID大小的数据开销
- 带通滤波器是什么,它的原理是什么
- 【云原生】Helm 常用命令(chart 安装、升级、回滚、卸载等操作)
- 服务器所在文件夹路径,服务器上文件夹路径
- springboot内嵌tomcat如何优雅开启http端口
- 【读报告】基于物联网技术的道岔转换设备检测专家平台的研究 研制报告
- 1. 不吹不擂,第一篇就能提升你对Bean Validation数据校验的认知
- 清明节到来,微信公众号图文排版有哪些使用技巧?
- C语言学习之路——程序设计概述
- IPv6的被请求节点的组播地址
热门文章
- windows系统bat批处 注册一个exe执行文件变成服务
- jquery 如何插入元素
- eltable 无数据文案修改_el-table的二次封装详细版(一)
- php 日期 星期_php日期如何转星期
- IDEA插件推荐:Material Theme UI(把IDEA变得更加美观)
- 你真的理解a -- -- a a++ ++a 吗?
- 直接请求接口_【分享】接口是什么?实现原理的是什么?
- vscode 打开函数表_效率倍增!10个超级好用的VScode使用技巧!
- 计算机机房维护保养计划表,机房日常维护保养计划
- atom对比 vscode_几款前端IDE工具:Sublime、Atom、VSCode比较